[lxc-devel] [lxd/master] Serialize reads to the cluster database

freeekanayaka on Github lxc-bot at linuxcontainers.org
Thu May 24 09:21:57 UTC 2018


A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 1372 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20180524/bb634354/attachment.bin>
-------------- next part --------------
From ccb052fa4ed1033e4e121063783fee947f9e552d Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Thu, 24 May 2018 05:29:40 +0000
Subject: [PATCH] Serialize reads to the cluster database

This change makes us use transactions also in test code, and the /internal/sql
API endpoint which was not doing that before.

It also drops concurrent calls in the GET /containers and cluster heartbeat
code, since at the moment they are hardly going to take advantage of
concurrency, as the nodes are going to serialize db reads anyways (and db reads
atm are a substantial part of the total time spent in handling an API request).

The lower-level change to actually serialize reads was committed partly in
go-grpc-sql and partly in dqlite.

This should mitigate #4548 for now.

Moving forward we should start optimizing dqlite to be faster (I believe there
are substantial gains to be made there) , and perhaps also change the LXD code
that interacts with the database to be more efficient (e.g. caching prepared
statements, not entering/exiting a transaction for every query, etc).

Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
 lxd/api_internal.go        | 20 ++++++++++-----
 lxd/cluster/heartbeat.go   |  7 +-----
 lxd/containers_get.go      |  6 +----
 lxd/db/db_internal_test.go | 63 ++++++++++++++++++++++++++++++++++------------
 lxd/profiles_test.go       | 11 ++++++--
 5 files changed, 72 insertions(+), 35 deletions(-)

diff --git a/lxd/api_internal.go b/lxd/api_internal.go
index b1741eccb..f86d78b85 100644
--- a/lxd/api_internal.go
+++ b/lxd/api_internal.go
@@ -189,10 +189,18 @@ func internalSQLPost(d *Daemon, r *http.Request) Response {
 		}
 
 		result := internalSQLResult{}
+		tx, err := db.Begin()
+		if err != nil {
+			return SmartError(err)
+		}
 		if strings.HasPrefix(strings.ToUpper(query), "SELECT") {
-			err = internalSQLSelect(db, query, &result)
+			err = internalSQLSelect(tx, query, &result)
+			tx.Rollback()
 		} else {
-			err = internalSQLExec(db, query, &result)
+			err = internalSQLExec(tx, query, &result)
+			if err == nil {
+				err = tx.Commit()
+			}
 		}
 		if err != nil {
 			return SmartError(err)
@@ -202,9 +210,9 @@ func internalSQLPost(d *Daemon, r *http.Request) Response {
 	return SyncResponse(true, batch)
 }
 
-func internalSQLSelect(db *sql.DB, query string, result *internalSQLResult) error {
+func internalSQLSelect(tx *sql.Tx, query string, result *internalSQLResult) error {
 	result.Type = "select"
-	rows, err := db.Query(query)
+	rows, err := tx.Query(query)
 	if err != nil {
 		return errors.Wrap(err, "failed to execute query")
 	}
@@ -240,9 +248,9 @@ func internalSQLSelect(db *sql.DB, query string, result *internalSQLResult) erro
 	return nil
 }
 
-func internalSQLExec(db *sql.DB, query string, result *internalSQLResult) error {
+func internalSQLExec(tx *sql.Tx, query string, result *internalSQLResult) error {
 	result.Type = "exec"
-	r, err := db.Exec(query)
+	r, err := tx.Exec(query)
 	if err != nil {
 		return errors.Wrapf(err, "failed to exec query")
 	}
diff --git a/lxd/cluster/heartbeat.go b/lxd/cluster/heartbeat.go
index 63619a800..67a27ff58 100644
--- a/lxd/cluster/heartbeat.go
+++ b/lxd/cluster/heartbeat.go
@@ -5,7 +5,6 @@ import (
 	"encoding/json"
 	"fmt"
 	"net/http"
-	"sync"
 	"time"
 
 	"github.com/hashicorp/raft"
@@ -71,12 +70,9 @@ func Heartbeat(gateway *Gateway, cluster *db.Cluster) (task.Func, task.Schedule)
 			logger.Warnf("Failed to get current cluster nodes: %v", err)
 			return
 		}
-		wg := sync.WaitGroup{}
-		wg.Add(len(nodes))
 		heartbeats := make([]time.Time, len(nodes))
 		for i, node := range nodes {
-			go func(i int, address string) {
-				defer wg.Done()
+			func(i int, address string) {
 				var err error
 				// Only send actual requests to other nodes
 				if address != nodeAddress {
@@ -90,7 +86,6 @@ func Heartbeat(gateway *Gateway, cluster *db.Cluster) (task.Func, task.Schedule)
 				}
 			}(i, node.Address)
 		}
-		wg.Wait()
 
 		// If the context has been cancelled, return immediately.
 		if ctx.Err() != nil {
diff --git a/lxd/containers_get.go b/lxd/containers_get.go
index 3a4fe94e4..a54c8964d 100644
--- a/lxd/containers_get.go
+++ b/lxd/containers_get.go
@@ -80,7 +80,6 @@ func doContainersGet(d *Daemon, r *http.Request) (interface{}, error) {
 		resultMu.Unlock()
 	}
 
-	wg := sync.WaitGroup{}
 	for address, containers := range result {
 		// If this is an internal request from another cluster node,
 		// ignore containers from other nodes, and return only the ones
@@ -101,9 +100,7 @@ func doContainersGet(d *Daemon, r *http.Request) (interface{}, error) {
 		// For recursion requests we need to fetch the state of remote
 		// containers from their respective nodes.
 		if recursion && address != "" && !isClusterNotification(r) {
-			wg.Add(1)
-			go func(address string, containers []string) {
-				defer wg.Done()
+			func(address string, containers []string) {
 				cert := d.endpoints.NetworkCert()
 
 				cs, err := doContainersGetFromNode(address, cert)
@@ -138,7 +135,6 @@ func doContainersGet(d *Daemon, r *http.Request) (interface{}, error) {
 			}
 		}
 	}
-	wg.Wait()
 
 	if !recursion {
 		return resultString, nil
diff --git a/lxd/db/db_internal_test.go b/lxd/db/db_internal_test.go
index 3297ed2ba..41962764f 100644
--- a/lxd/db/db_internal_test.go
+++ b/lxd/db/db_internal_test.go
@@ -39,7 +39,11 @@ type dbTestSuite struct {
 
 func (s *dbTestSuite) SetupTest() {
 	s.db, s.cleanup = s.CreateTestDb()
-	_, err := s.db.DB().Exec(fixtures)
+
+	tx, commit := s.CreateTestTx()
+	defer commit()
+
+	_, err := tx.Exec(fixtures)
 	s.Nil(err)
 }
 
@@ -61,6 +65,16 @@ func (s *dbTestSuite) CreateTestDb() (*Cluster, func()) {
 	return db, cleanup
 }
 
+// Enter a transaction on the test in-memory DB.
+func (s *dbTestSuite) CreateTestTx() (*sql.Tx, func()) {
+	tx, err := s.db.DB().Begin()
+	s.Nil(err)
+	commit := func() {
+		s.Nil(tx.Commit())
+	}
+	return tx, commit
+}
+
 func TestDBTestSuite(t *testing.T) {
 	suite.Run(t, new(dbTestSuite))
 }
@@ -73,30 +87,33 @@ func (s *dbTestSuite) Test_deleting_a_container_cascades_on_related_tables() {
 	// Drop the container we just created.
 	statements = `DELETE FROM containers WHERE name = 'thename';`
 
-	_, err = s.db.DB().Exec(statements)
+	tx, commit := s.CreateTestTx()
+	defer commit()
+
+	_, err = tx.Exec(statements)
 	s.Nil(err, "Error deleting container!")
 
 	// Make sure there are 0 container_profiles entries left.
 	statements = `SELECT count(*) FROM containers_profiles;`
-	err = s.db.DB().QueryRow(statements).Scan(&count)
+	err = tx.QueryRow(statements).Scan(&count)
 	s.Nil(err)
 	s.Equal(count, 0, "Deleting a container didn't delete the profile association!")
 
 	// Make sure there are 0 containers_config entries left.
 	statements = `SELECT count(*) FROM containers_config;`
-	err = s.db.DB().QueryRow(statements).Scan(&count)
+	err = tx.QueryRow(statements).Scan(&count)
 	s.Nil(err)
 	s.Equal(count, 0, "Deleting a container didn't delete the associated container_config!")
 
 	// Make sure there are 0 containers_devices entries left.
 	statements = `SELECT count(*) FROM containers_devices;`
-	err = s.db.DB().QueryRow(statements).Scan(&count)
+	err = tx.QueryRow(statements).Scan(&count)
 	s.Nil(err)
 	s.Equal(count, 0, "Deleting a container didn't delete the associated container_devices!")
 
 	// Make sure there are 0 containers_devices_config entries left.
 	statements = `SELECT count(*) FROM containers_devices_config;`
-	err = s.db.DB().QueryRow(statements).Scan(&count)
+	err = tx.QueryRow(statements).Scan(&count)
 	s.Nil(err)
 	s.Equal(count, 0, "Deleting a container didn't delete the associated container_devices_config!")
 }
@@ -109,30 +126,33 @@ func (s *dbTestSuite) Test_deleting_a_profile_cascades_on_related_tables() {
 	// Drop the profile we just created.
 	statements = `DELETE FROM profiles WHERE name = 'theprofile';`
 
-	_, err = s.db.DB().Exec(statements)
+	tx, commit := s.CreateTestTx()
+	defer commit()
+
+	_, err = tx.Exec(statements)
 	s.Nil(err)
 
 	// Make sure there are 0 container_profiles entries left.
 	statements = `SELECT count(*) FROM containers_profiles WHERE profile_id = 2;`
-	err = s.db.DB().QueryRow(statements).Scan(&count)
+	err = tx.QueryRow(statements).Scan(&count)
 	s.Nil(err)
 	s.Equal(count, 0, "Deleting a profile didn't delete the container association!")
 
 	// Make sure there are 0 profiles_devices entries left.
 	statements = `SELECT count(*) FROM profiles_devices WHERE profile_id == 2;`
-	err = s.db.DB().QueryRow(statements).Scan(&count)
+	err = tx.QueryRow(statements).Scan(&count)
 	s.Nil(err)
 	s.Equal(count, 0, "Deleting a profile didn't delete the related profiles_devices!")
 
 	// Make sure there are 0 profiles_config entries left.
 	statements = `SELECT count(*) FROM profiles_config WHERE profile_id == 2;`
-	err = s.db.DB().QueryRow(statements).Scan(&count)
+	err = tx.QueryRow(statements).Scan(&count)
 	s.Nil(err)
 	s.Equal(count, 0, "Deleting a profile didn't delete the related profiles_config! There are %d left")
 
 	// Make sure there are 0 profiles_devices_config entries left.
 	statements = `SELECT count(*) FROM profiles_devices_config WHERE profile_device_id == 3;`
-	err = s.db.DB().QueryRow(statements).Scan(&count)
+	err = tx.QueryRow(statements).Scan(&count)
 	s.Nil(err)
 	s.Equal(count, 0, "Deleting a profile didn't delete the related profiles_devices_config!")
 }
@@ -145,17 +165,20 @@ func (s *dbTestSuite) Test_deleting_an_image_cascades_on_related_tables() {
 	// Drop the image we just created.
 	statements = `DELETE FROM images;`
 
-	_, err = s.db.DB().Exec(statements)
+	tx, commit := s.CreateTestTx()
+	defer commit()
+
+	_, err = tx.Exec(statements)
 	s.Nil(err)
 	// Make sure there are 0 images_aliases entries left.
 	statements = `SELECT count(*) FROM images_aliases;`
-	err = s.db.DB().QueryRow(statements).Scan(&count)
+	err = tx.QueryRow(statements).Scan(&count)
 	s.Nil(err)
 	s.Equal(count, 0, "Deleting an image didn't delete the image alias association!")
 
 	// Make sure there are 0 images_properties entries left.
 	statements = `SELECT count(*) FROM images_properties;`
-	err = s.db.DB().QueryRow(statements).Scan(&count)
+	err = tx.QueryRow(statements).Scan(&count)
 	s.Nil(err)
 	s.Equal(count, 0, "Deleting an image didn't delete the related images_properties!")
 }
@@ -250,9 +273,13 @@ func (s *dbTestSuite) Test_ContainerConfig() {
 	var result map[string]string
 	var expected map[string]string
 
-	_, err = s.db.DB().Exec("INSERT INTO containers_config (container_id, key, value) VALUES (1, 'something', 'something else');")
+	tx, commit := s.CreateTestTx()
+
+	_, err = tx.Exec("INSERT INTO containers_config (container_id, key, value) VALUES (1, 'something', 'something else');")
 	s.Nil(err)
 
+	commit()
+
 	result, err = s.db.ContainerConfig(1)
 	s.Nil(err)
 
@@ -269,9 +296,13 @@ func (s *dbTestSuite) Test_dbProfileConfig() {
 	var result map[string]string
 	var expected map[string]string
 
-	_, err = s.db.DB().Exec("INSERT INTO profiles_config (profile_id, key, value) VALUES (2, 'something', 'something else');")
+	tx, commit := s.CreateTestTx()
+
+	_, err = tx.Exec("INSERT INTO profiles_config (profile_id, key, value) VALUES (2, 'something', 'something else');")
 	s.Nil(err)
 
+	commit()
+
 	result, err = s.db.ProfileConfig("theprofile")
 	s.Nil(err)
 
diff --git a/lxd/profiles_test.go b/lxd/profiles_test.go
index ab2b60b6a..fd2dc3a7b 100644
--- a/lxd/profiles_test.go
+++ b/lxd/profiles_test.go
@@ -9,7 +9,10 @@ import (
 func Test_removing_a_profile_deletes_associated_configuration_entries(t *testing.T) {
 	cluster, cleanup := db.NewTestCluster(t)
 	defer cleanup()
-	db := cluster.DB()
+	tx, err := cluster.DB().Begin()
+	if err != nil {
+		t.Fatal(err)
+	}
 
 	// Insert a container and a related profile. Dont't forget that the profile
 	// we insert is profile ID 2 (there is a default profile already).
@@ -21,7 +24,11 @@ func Test_removing_a_profile_deletes_associated_configuration_entries(t *testing
     INSERT INTO profiles_config (key, value, profile_id) VALUES ('thekey', 'thevalue', 2);
     INSERT INTO profiles_devices_config (profile_device_id, key, value) VALUES (1, 'something', 'boring');`
 
-	_, err := db.Exec(statements)
+	_, err = tx.Exec(statements)
+	if err != nil {
+		t.Fatal(err)
+	}
+	err = tx.Commit()
 	if err != nil {
 		t.Fatal(err)
 	}


More information about the lxc-devel mailing list