[lxc-devel] [lxd/master] Always use transactions when querying the database

freeekanayaka on Github lxc-bot at linuxcontainers.org
Thu Mar 22 09:14:57 UTC 2018


A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 753 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20180322/7449e85c/attachment.bin>
-------------- next part --------------
From 6c47e79abe7bd3c44bc4e60638fcb0ffbbf3bb00 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Wed, 21 Mar 2018 13:08:43 +0000
Subject: [PATCH] Always use transactions when querying the database

Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
 lxd/db/certificates.go    |  44 ++++++-----
 lxd/db/cluster/open.go    |  14 +++-
 lxd/db/containers.go      |  10 +--
 lxd/db/db.go              | 198 +++++++++++++++++-----------------------------
 lxd/db/images.go          |  20 ++---
 lxd/db/migration.go       |   4 +
 lxd/db/networks.go        |   2 +-
 lxd/db/profiles.go        |   4 +-
 lxd/db/query/retry.go     |   2 +-
 lxd/db/storage_pools.go   |   5 +-
 lxd/db/storage_volumes.go |   6 +-
 11 files changed, 138 insertions(+), 171 deletions(-)

diff --git a/lxd/db/certificates.go b/lxd/db/certificates.go
index e773685e5..3f9317f40 100644
--- a/lxd/db/certificates.go
+++ b/lxd/db/certificates.go
@@ -12,28 +12,34 @@ type CertInfo struct {
 
 // CertificatesGet returns all certificates from the DB as CertBaseInfo objects.
 func (c *Cluster) CertificatesGet() (certs []*CertInfo, err error) {
-	rows, err := dbQuery(
-		c.db,
-		"SELECT id, fingerprint, type, name, certificate FROM certificates",
-	)
+	err = c.Transaction(func(tx *ClusterTx) error {
+		rows, err := tx.tx.Query(
+			"SELECT id, fingerprint, type, name, certificate FROM certificates",
+		)
+		if err != nil {
+			return err
+		}
+
+		defer rows.Close()
+
+		for rows.Next() {
+			cert := new(CertInfo)
+			rows.Scan(
+				&cert.ID,
+				&cert.Fingerprint,
+				&cert.Type,
+				&cert.Name,
+				&cert.Certificate,
+			)
+			certs = append(certs, cert)
+		}
+
+		return rows.Err()
+	})
 	if err != nil {
 		return certs, err
 	}
 
-	defer rows.Close()
-
-	for rows.Next() {
-		cert := new(CertInfo)
-		rows.Scan(
-			&cert.ID,
-			&cert.Fingerprint,
-			&cert.Type,
-			&cert.Name,
-			&cert.Certificate,
-		)
-		certs = append(certs, cert)
-	}
-
 	return certs, nil
 }
 
@@ -104,7 +110,7 @@ func (c *Cluster) CertSave(cert *CertInfo) error {
 
 // CertDelete deletes a certificate from the db.
 func (c *Cluster) CertDelete(fingerprint string) error {
-	_, err := exec(c.db, "DELETE FROM certificates WHERE fingerprint=?", fingerprint)
+	err := exec(c.db, "DELETE FROM certificates WHERE fingerprint=?", fingerprint)
 	if err != nil {
 		return err
 	}
diff --git a/lxd/db/cluster/open.go b/lxd/db/cluster/open.go
index c37fcd4e7..0ca5d50af 100644
--- a/lxd/db/cluster/open.go
+++ b/lxd/db/cluster/open.go
@@ -105,18 +105,28 @@ func EnsureSchema(db *sql.DB, address string) (bool, error) {
 	// 1. This is needed for referential integrity with other tables. Also,
 	// create a default profile.
 	if initial == 0 {
+		tx, err := db.Begin()
+		if err != nil {
+			return false, err
+		}
 		stmt := `
 INSERT INTO nodes(id, name, address, schema, api_extensions) VALUES(1, 'none', '0.0.0.0', ?, ?)
 `
-		_, err := db.Exec(stmt, SchemaVersion, apiExtensions)
+		_, err = tx.Exec(stmt, SchemaVersion, apiExtensions)
 		if err != nil {
+			tx.Rollback()
 			return false, err
 		}
 
 		stmt = `
 INSERT INTO profiles (name, description) VALUES ('default', 'Default LXD profile')
 `
-		_, err = db.Exec(stmt)
+		_, err = tx.Exec(stmt)
+		if err != nil {
+			tx.Rollback()
+			return false, err
+		}
+		err = tx.Commit()
 		if err != nil {
 			return false, err
 		}
diff --git a/lxd/db/containers.go b/lxd/db/containers.go
index 866f82bba..4712246ba 100644
--- a/lxd/db/containers.go
+++ b/lxd/db/containers.go
@@ -326,7 +326,7 @@ func (c *Cluster) ContainerRemove(name string) error {
 		return err
 	}
 
-	_, err = exec(c.db, "DELETE FROM containers WHERE id=?", id)
+	err = exec(c.db, "DELETE FROM containers WHERE id=?", id)
 	if err != nil {
 		return err
 	}
@@ -539,7 +539,7 @@ func (c *Cluster) ContainerConfigGet(id int, key string) (string, error) {
 }
 
 func (c *Cluster) ContainerConfigRemove(id int, name string) error {
-	_, err := exec(c.db, "DELETE FROM containers_config WHERE key=? AND container_id=?", name, id)
+	err := exec(c.db, "DELETE FROM containers_config WHERE key=? AND container_id=?", name, id)
 	return err
 }
 
@@ -549,7 +549,7 @@ func (c *Cluster) ContainerSetStateful(id int, stateful bool) error {
 		statefulInt = 1
 	}
 
-	_, err := exec(c.db, "UPDATE containers SET stateful=? WHERE id=?", statefulInt, id)
+	err := exec(c.db, "UPDATE containers SET stateful=? WHERE id=?", statefulInt, id)
 	return err
 }
 
@@ -648,7 +648,7 @@ func (c *Cluster) ContainersList(cType ContainerType) ([]string, error) {
 
 func (c *Cluster) ContainersResetState() error {
 	// Reset all container states
-	_, err := exec(c.db, "DELETE FROM containers_config WHERE key='volatile.last_state.power'")
+	err := exec(c.db, "DELETE FROM containers_config WHERE key='volatile.last_state.power'")
 	return err
 }
 
@@ -739,7 +739,7 @@ func ContainerUpdate(tx *sql.Tx, id int, description string, architecture int, e
 
 func (c *Cluster) ContainerLastUsedUpdate(id int, date time.Time) error {
 	stmt := `UPDATE containers SET last_use_date=? WHERE id=?`
-	_, err := exec(c.db, stmt, date, id)
+	err := exec(c.db, stmt, date, id)
 	return err
 }
 
diff --git a/lxd/db/db.go b/lxd/db/db.go
index f6e7c0d0f..bf550d8bb 100644
--- a/lxd/db/db.go
+++ b/lxd/db/db.go
@@ -361,117 +361,85 @@ func begin(db *sql.DB) (*sql.Tx, error) {
 }
 
 func TxCommit(tx *sql.Tx) error {
-	for i := 0; i < 1000; i++ {
-		err := tx.Commit()
-		if err == nil || err == sql.ErrTxDone { // Ignore duplicate commits/rollbacks
-			return nil
-		}
-		if !query.IsRetriableError(err) {
-			logger.Debugf("Txcommit: error %q", err)
-			return err
-		}
-		time.Sleep(30 * time.Millisecond)
+	err := tx.Commit()
+	if err == nil || err == sql.ErrTxDone { // Ignore duplicate commits/rollbacks
+		return nil
 	}
-
-	logger.Debugf("Txcommit: db still locked")
-	logger.Debugf(logger.GetStack())
-	return fmt.Errorf("DB is locked")
+	return err
 }
 
 func dbQueryRowScan(db *sql.DB, q string, args []interface{}, outargs []interface{}) error {
-	for i := 0; i < 1000; i++ {
-		err := db.QueryRow(q, args...).Scan(outargs...)
-		if err == nil {
-			return nil
-		}
-		if isNoMatchError(err) {
-			return err
-		}
-		if !query.IsRetriableError(err) {
-			return err
-		}
-		time.Sleep(30 * time.Millisecond)
-	}
-
-	logger.Debugf("DbQueryRowScan: query %q args %q, DB still locked", q, args)
-	logger.Debugf(logger.GetStack())
-	return fmt.Errorf("DB is locked")
-}
-
-func dbQuery(db *sql.DB, q string, args ...interface{}) (*sql.Rows, error) {
-	for i := 0; i < 1000; i++ {
-		result, err := db.Query(q, args...)
-		if err == nil {
-			return result, nil
-		}
-		if !query.IsRetriableError(err) {
-			logger.Debugf("DbQuery: query %q error %q", q, err)
-			return nil, err
-		}
-		time.Sleep(30 * time.Millisecond)
-	}
-
-	logger.Debugf("DbQuery: query %q args %q, DB still locked", q, args)
-	logger.Debugf(logger.GetStack())
-	return nil, fmt.Errorf("DB is locked")
+	return query.Retry(func() error {
+		return query.Transaction(db, func(tx *sql.Tx) error {
+			return tx.QueryRow(q, args...).Scan(outargs...)
+		})
+	})
 }
 
-func doDbQueryScan(qi queryer, q string, args []interface{}, outargs []interface{}) ([][]interface{}, error) {
-	rows, err := qi.Query(q, args...)
-	if err != nil {
-		return [][]interface{}{}, err
-	}
-	defer rows.Close()
+func doDbQueryScan(db *sql.DB, q string, args []interface{}, outargs []interface{}) ([][]interface{}, error) {
 	result := [][]interface{}{}
-	for rows.Next() {
-		ptrargs := make([]interface{}, len(outargs))
-		for i := range outargs {
-			switch t := outargs[i].(type) {
-			case string:
-				str := ""
-				ptrargs[i] = &str
-			case int:
-				integer := 0
-				ptrargs[i] = &integer
-			case int64:
-				integer := int64(0)
-				ptrargs[i] = &integer
-			default:
-				return [][]interface{}{}, fmt.Errorf("Bad interface type: %s", t)
+
+	err := query.Retry(func() error {
+		return query.Transaction(db, func(tx *sql.Tx) error {
+			rows, err := tx.Query(q, args...)
+			if err != nil {
+				return err
 			}
-		}
-		err = rows.Scan(ptrargs...)
-		if err != nil {
-			return [][]interface{}{}, err
-		}
-		newargs := make([]interface{}, len(outargs))
-		for i := range ptrargs {
-			switch t := outargs[i].(type) {
-			case string:
-				newargs[i] = *ptrargs[i].(*string)
-			case int:
-				newargs[i] = *ptrargs[i].(*int)
-			case int64:
-				newargs[i] = *ptrargs[i].(*int64)
-			default:
-				return [][]interface{}{}, fmt.Errorf("Bad interface type: %s", t)
+			defer rows.Close()
+
+			for rows.Next() {
+				ptrargs := make([]interface{}, len(outargs))
+				for i := range outargs {
+					switch t := outargs[i].(type) {
+					case string:
+						str := ""
+						ptrargs[i] = &str
+					case int:
+						integer := 0
+						ptrargs[i] = &integer
+					case int64:
+						integer := int64(0)
+						ptrargs[i] = &integer
+					default:
+						return fmt.Errorf("Bad interface type: %s", t)
+					}
+				}
+				err = rows.Scan(ptrargs...)
+				if err != nil {
+					return err
+				}
+				newargs := make([]interface{}, len(outargs))
+				for i := range ptrargs {
+					switch t := outargs[i].(type) {
+					case string:
+						newargs[i] = *ptrargs[i].(*string)
+					case int:
+						newargs[i] = *ptrargs[i].(*int)
+					case int64:
+						newargs[i] = *ptrargs[i].(*int64)
+					default:
+						return fmt.Errorf("Bad interface type: %s", t)
+					}
+				}
+				result = append(result, newargs)
 			}
-		}
-		result = append(result, newargs)
-	}
-	err = rows.Err()
+			err = rows.Err()
+			if err != nil {
+				return err
+			}
+			return nil
+		})
+	})
 	if err != nil {
 		return [][]interface{}{}, err
 	}
+
 	return result, nil
-}
 
-type queryer interface {
-	Query(query string, args ...interface{}) (*sql.Rows, error)
 }
 
 /*
- * . qi anything implementing the querier interface (i.e. either sql.DB or sql.Tx)
+ * . db a reference to a sql.DB instance
  * . q is the database query
  * . inargs is an array of interfaces containing the query arguments
  * . outfmt is an array of interfaces containing the right types of output
@@ -483,38 +451,16 @@ type queryer interface {
  * The result will be an array (one per output row) of arrays (one per output argument)
  * of interfaces, containing pointers to the actual output arguments.
  */
-func queryScan(qi queryer, q string, inargs []interface{}, outfmt []interface{}) ([][]interface{}, error) {
-	for i := 0; i < 1000; i++ {
-		result, err := doDbQueryScan(qi, q, inargs, outfmt)
-		if err == nil {
-			return result, nil
-		}
-		if !query.IsRetriableError(err) {
-			logger.Debugf("DbQuery: query %q error %q", q, err)
-			return nil, err
-		}
-		time.Sleep(30 * time.Millisecond)
-	}
-
-	logger.Debugf("DbQueryscan: query %q inargs %q, DB still locked", q, inargs)
-	logger.Debugf(logger.GetStack())
-	return nil, fmt.Errorf("DB is locked")
+func queryScan(db *sql.DB, q string, inargs []interface{}, outfmt []interface{}) ([][]interface{}, error) {
+	return doDbQueryScan(db, q, inargs, outfmt)
 }
 
-func exec(db *sql.DB, q string, args ...interface{}) (sql.Result, error) {
-	for i := 0; i < 1000; i++ {
-		result, err := db.Exec(q, args...)
-		if err == nil {
-			return result, nil
-		}
-		if !query.IsRetriableError(err) {
-			logger.Debugf("DbExec: query %q error %q", q, err)
-			return nil, err
-		}
-		time.Sleep(30 * time.Millisecond)
-	}
-
-	logger.Debugf("DbExec: query %q args %q, DB still locked", q, args)
-	logger.Debugf(logger.GetStack())
-	return nil, fmt.Errorf("DB is locked")
+func exec(db *sql.DB, q string, args ...interface{}) error {
+	err := query.Retry(func() error {
+		return query.Transaction(db, func(tx *sql.Tx) error {
+			_, err := tx.Exec(q, args...)
+			return err
+		})
+	})
+	return err
 }
diff --git a/lxd/db/images.go b/lxd/db/images.go
index 6fe2e7e43..d19da26d3 100644
--- a/lxd/db/images.go
+++ b/lxd/db/images.go
@@ -92,7 +92,7 @@ func (c *Cluster) ImageSourceInsert(imageId int, server string, protocol string,
 		return fmt.Errorf("Invalid protocol: %s", protocol)
 	}
 
-	_, err := exec(c.db, stmt, imageId, server, protocolInt, certificate, alias)
+	err := exec(c.db, stmt, imageId, server, protocolInt, certificate, alias)
 	return err
 }
 
@@ -375,7 +375,7 @@ func (c *Cluster) ImageAssociateNode(fingerprint string) error {
 }
 
 func (c *Cluster) ImageDelete(id int) error {
-	_, err := exec(c.db, "DELETE FROM images WHERE id=?", id)
+	err := exec(c.db, "DELETE FROM images WHERE id=?", id)
 	if err != nil {
 		return err
 	}
@@ -432,42 +432,42 @@ func (c *Cluster) ImageAliasGet(name string, isTrustedClient bool) (int, api.Ima
 }
 
 func (c *Cluster) ImageAliasRename(id int, name string) error {
-	_, err := exec(c.db, "UPDATE images_aliases SET name=? WHERE id=?", name, id)
+	err := exec(c.db, "UPDATE images_aliases SET name=? WHERE id=?", name, id)
 	return err
 }
 
 func (c *Cluster) ImageAliasDelete(name string) error {
-	_, err := exec(c.db, "DELETE FROM images_aliases WHERE name=?", name)
+	err := exec(c.db, "DELETE FROM images_aliases WHERE name=?", name)
 	return err
 }
 
 func (c *Cluster) ImageAliasesMove(source int, destination int) error {
-	_, err := exec(c.db, "UPDATE images_aliases SET image_id=? WHERE image_id=?", destination, source)
+	err := exec(c.db, "UPDATE images_aliases SET image_id=? WHERE image_id=?", destination, source)
 	return err
 }
 
 // Insert an alias ento the database.
 func (c *Cluster) ImageAliasAdd(name string, imageID int, desc string) error {
 	stmt := `INSERT INTO images_aliases (name, image_id, description) values (?, ?, ?)`
-	_, err := exec(c.db, stmt, name, imageID, desc)
+	err := exec(c.db, stmt, name, imageID, desc)
 	return err
 }
 
 func (c *Cluster) ImageAliasUpdate(id int, imageID int, desc string) error {
 	stmt := `UPDATE images_aliases SET image_id=?, description=? WHERE id=?`
-	_, err := exec(c.db, stmt, imageID, desc, id)
+	err := exec(c.db, stmt, imageID, desc, id)
 	return err
 }
 
 func (c *Cluster) ImageLastAccessUpdate(fingerprint string, date time.Time) error {
 	stmt := `UPDATE images SET last_use_date=? WHERE fingerprint=?`
-	_, err := exec(c.db, stmt, date, fingerprint)
+	err := exec(c.db, stmt, date, fingerprint)
 	return err
 }
 
 func (c *Cluster) ImageLastAccessInit(fingerprint string) error {
 	stmt := `UPDATE images SET cached=1, last_use_date=strftime("%s") WHERE fingerprint=?`
-	_, err := exec(c.db, stmt, fingerprint)
+	err := exec(c.db, stmt, fingerprint)
 	return err
 }
 
@@ -652,6 +652,6 @@ func (c *Cluster) ImageGetPoolNamesFromIDs(poolIDs []int64) ([]string, error) {
 
 // ImageUploadedAt updates the upload_date column and an image row.
 func (c *Cluster) ImageUploadedAt(id int, uploadedAt time.Time) error {
-	_, err := exec(c.db, "UPDATE images SET upload_date=? WHERE id=?", uploadedAt, id)
+	err := exec(c.db, "UPDATE images SET upload_date=? WHERE id=?", uploadedAt, id)
 	return err
 }
diff --git a/lxd/db/migration.go b/lxd/db/migration.go
index 51ef9fa2e..d2116ceb9 100644
--- a/lxd/db/migration.go
+++ b/lxd/db/migration.go
@@ -95,6 +95,7 @@ func (c *Cluster) ImportPreClusteringData(dump *Dump) error {
 	// gets created no matter what.
 	_, err = tx.Exec("DELETE FROM profiles WHERE id=1")
 	if err != nil {
+		tx.Rollback()
 		return errors.Wrap(err, "failed to delete default profile")
 	}
 
@@ -190,13 +191,16 @@ func (c *Cluster) ImportPreClusteringData(dump *Dump) error {
 			stmt += fmt.Sprintf(" VALUES %s", query.Params(len(columns)))
 			result, err := tx.Exec(stmt, row...)
 			if err != nil {
+				tx.Rollback()
 				return errors.Wrapf(err, "failed to insert row %d into %s", i, table)
 			}
 			n, err := result.RowsAffected()
 			if err != nil {
+				tx.Rollback()
 				return errors.Wrapf(err, "no result count for row %d of %s", i, table)
 			}
 			if n != 1 {
+				tx.Rollback()
 				return fmt.Errorf("could not insert %d int %s", i, table)
 			}
 
diff --git a/lxd/db/networks.go b/lxd/db/networks.go
index 2a97fd03e..215f21101 100644
--- a/lxd/db/networks.go
+++ b/lxd/db/networks.go
@@ -556,7 +556,7 @@ func (c *Cluster) NetworkDelete(name string) error {
 		return err
 	}
 
-	_, err = exec(c.db, "DELETE FROM networks WHERE id=?", id)
+	err = exec(c.db, "DELETE FROM networks WHERE id=?", id)
 	if err != nil {
 		return err
 	}
diff --git a/lxd/db/profiles.go b/lxd/db/profiles.go
index 8c95e4a0b..d36f93df7 100644
--- a/lxd/db/profiles.go
+++ b/lxd/db/profiles.go
@@ -165,7 +165,7 @@ func (c *Cluster) ProfileDelete(name string) error {
 		return err
 	}
 
-	_, err = exec(c.db, "DELETE FROM profiles WHERE id=?", id)
+	err = exec(c.db, "DELETE FROM profiles WHERE id=?", id)
 	if err != nil {
 		return err
 	}
@@ -263,7 +263,7 @@ DELETE FROM profiles_config WHERE profile_id NOT IN (SELECT id FROM profiles);
 DELETE FROM profiles_devices WHERE profile_id NOT IN (SELECT id FROM profiles);
 DELETE FROM profiles_devices_config WHERE profile_device_id NOT IN (SELECT id FROM profiles_devices);
 `
-	_, err := c.db.Exec(stmt)
+	err := exec(c.db, stmt)
 	if err != nil {
 		return err
 	}
diff --git a/lxd/db/query/retry.go b/lxd/db/query/retry.go
index c2f9676a6..8b3eef57c 100644
--- a/lxd/db/query/retry.go
+++ b/lxd/db/query/retry.go
@@ -19,7 +19,7 @@ func Retry(f func() error) error {
 	for i := 0; i < 20; i++ {
 		err = f()
 		if err != nil {
-			logger.Debugf("Database error %#v", err)
+			logger.Debugf("Database error: %#v", err)
 
 			if IsRetriableError(err) {
 				logger.Debugf("Retry failed db interaction (%v)", err)
diff --git a/lxd/db/storage_pools.go b/lxd/db/storage_pools.go
index 8b2da0904..050157d01 100644
--- a/lxd/db/storage_pools.go
+++ b/lxd/db/storage_pools.go
@@ -659,7 +659,7 @@ func (c *Cluster) StoragePoolDelete(poolName string) (*api.StoragePool, error) {
 		return nil, err
 	}
 
-	_, err = exec(c.db, "DELETE FROM storage_pools WHERE id=?", poolID)
+	err = exec(c.db, "DELETE FROM storage_pools WHERE id=?", poolID)
 	if err != nil {
 		return nil, err
 	}
@@ -842,6 +842,7 @@ func (c *Cluster) StoragePoolVolumeUpdate(volumeName string, volumeType int, poo
 		return StorageVolumeDescriptionUpdate(tx, volumeID, volumeDescription)
 	})
 	if err != nil {
+		tx.Rollback()
 		return err
 	}
 
@@ -1051,6 +1052,6 @@ func StoragePoolVolumeTypeToName(volumeType int) (string, error) {
 }
 
 func (c *Cluster) StoragePoolInsertZfsDriver() error {
-	_, err := exec(c.db, "UPDATE storage_pools SET driver='zfs', description='' WHERE driver=''")
+	err := exec(c.db, "UPDATE storage_pools SET driver='zfs', description='' WHERE driver=''")
 	return err
 }
diff --git a/lxd/db/storage_volumes.go b/lxd/db/storage_volumes.go
index 4df68a963..151ad7faf 100644
--- a/lxd/db/storage_volumes.go
+++ b/lxd/db/storage_volumes.go
@@ -178,17 +178,17 @@ func (c *Cluster) StorageVolumeCleanupImages(fingerprints []string) error {
 	for _, fingerprint := range fingerprints {
 		args = append(args, fingerprint)
 	}
-	_, err := exec(c.db, stmt, args...)
+	err := exec(c.db, stmt, args...)
 	return err
 }
 
 func (c *Cluster) StorageVolumeMoveToLVMThinPoolNameKey() error {
-	_, err := exec(c.db, "UPDATE storage_pools_config SET key='lvm.thinpool_name' WHERE key='volume.lvm.thinpool_name';")
+	err := exec(c.db, "UPDATE storage_pools_config SET key='lvm.thinpool_name' WHERE key='volume.lvm.thinpool_name';")
 	if err != nil {
 		return err
 	}
 
-	_, err = exec(c.db, "DELETE FROM storage_volumes_config WHERE key='lvm.thinpool_name';")
+	err = exec(c.db, "DELETE FROM storage_volumes_config WHERE key='lvm.thinpool_name';")
 	if err != nil {
 		return err
 	}


More information about the lxc-devel mailing list