[lxc-devel] [lxd/master] Always use transactions when querying the database
freeekanayaka on Github
lxc-bot at linuxcontainers.org
Thu Mar 22 09:14:57 UTC 2018
A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 753 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20180322/7449e85c/attachment.bin>
-------------- next part --------------
From 6c47e79abe7bd3c44bc4e60638fcb0ffbbf3bb00 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Wed, 21 Mar 2018 13:08:43 +0000
Subject: [PATCH] Always use transactions when querying the database
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/db/certificates.go | 44 ++++++-----
lxd/db/cluster/open.go | 14 +++-
lxd/db/containers.go | 10 +--
lxd/db/db.go | 198 +++++++++++++++++-----------------------------
lxd/db/images.go | 20 ++---
lxd/db/migration.go | 4 +
lxd/db/networks.go | 2 +-
lxd/db/profiles.go | 4 +-
lxd/db/query/retry.go | 2 +-
lxd/db/storage_pools.go | 5 +-
lxd/db/storage_volumes.go | 6 +-
11 files changed, 138 insertions(+), 171 deletions(-)
diff --git a/lxd/db/certificates.go b/lxd/db/certificates.go
index e773685e5..3f9317f40 100644
--- a/lxd/db/certificates.go
+++ b/lxd/db/certificates.go
@@ -12,28 +12,34 @@ type CertInfo struct {
// CertificatesGet returns all certificates from the DB as CertBaseInfo objects.
func (c *Cluster) CertificatesGet() (certs []*CertInfo, err error) {
- rows, err := dbQuery(
- c.db,
- "SELECT id, fingerprint, type, name, certificate FROM certificates",
- )
+ err = c.Transaction(func(tx *ClusterTx) error {
+ rows, err := tx.tx.Query(
+ "SELECT id, fingerprint, type, name, certificate FROM certificates",
+ )
+ if err != nil {
+ return err
+ }
+
+ defer rows.Close()
+
+ for rows.Next() {
+ cert := new(CertInfo)
+ rows.Scan(
+ &cert.ID,
+ &cert.Fingerprint,
+ &cert.Type,
+ &cert.Name,
+ &cert.Certificate,
+ )
+ certs = append(certs, cert)
+ }
+
+ return rows.Err()
+ })
if err != nil {
return certs, err
}
- defer rows.Close()
-
- for rows.Next() {
- cert := new(CertInfo)
- rows.Scan(
- &cert.ID,
- &cert.Fingerprint,
- &cert.Type,
- &cert.Name,
- &cert.Certificate,
- )
- certs = append(certs, cert)
- }
-
return certs, nil
}
@@ -104,7 +110,7 @@ func (c *Cluster) CertSave(cert *CertInfo) error {
// CertDelete deletes a certificate from the db.
func (c *Cluster) CertDelete(fingerprint string) error {
- _, err := exec(c.db, "DELETE FROM certificates WHERE fingerprint=?", fingerprint)
+ err := exec(c.db, "DELETE FROM certificates WHERE fingerprint=?", fingerprint)
if err != nil {
return err
}
diff --git a/lxd/db/cluster/open.go b/lxd/db/cluster/open.go
index c37fcd4e7..0ca5d50af 100644
--- a/lxd/db/cluster/open.go
+++ b/lxd/db/cluster/open.go
@@ -105,18 +105,28 @@ func EnsureSchema(db *sql.DB, address string) (bool, error) {
// 1. This is needed for referential integrity with other tables. Also,
// create a default profile.
if initial == 0 {
+ tx, err := db.Begin()
+ if err != nil {
+ return false, err
+ }
stmt := `
INSERT INTO nodes(id, name, address, schema, api_extensions) VALUES(1, 'none', '0.0.0.0', ?, ?)
`
- _, err := db.Exec(stmt, SchemaVersion, apiExtensions)
+ _, err = tx.Exec(stmt, SchemaVersion, apiExtensions)
if err != nil {
+ tx.Rollback()
return false, err
}
stmt = `
INSERT INTO profiles (name, description) VALUES ('default', 'Default LXD profile')
`
- _, err = db.Exec(stmt)
+ _, err = tx.Exec(stmt)
+ if err != nil {
+ tx.Rollback()
+ return false, err
+ }
+ err = tx.Commit()
if err != nil {
return false, err
}
diff --git a/lxd/db/containers.go b/lxd/db/containers.go
index 866f82bba..4712246ba 100644
--- a/lxd/db/containers.go
+++ b/lxd/db/containers.go
@@ -326,7 +326,7 @@ func (c *Cluster) ContainerRemove(name string) error {
return err
}
- _, err = exec(c.db, "DELETE FROM containers WHERE id=?", id)
+ err = exec(c.db, "DELETE FROM containers WHERE id=?", id)
if err != nil {
return err
}
@@ -539,7 +539,7 @@ func (c *Cluster) ContainerConfigGet(id int, key string) (string, error) {
}
func (c *Cluster) ContainerConfigRemove(id int, name string) error {
- _, err := exec(c.db, "DELETE FROM containers_config WHERE key=? AND container_id=?", name, id)
+ err := exec(c.db, "DELETE FROM containers_config WHERE key=? AND container_id=?", name, id)
return err
}
@@ -549,7 +549,7 @@ func (c *Cluster) ContainerSetStateful(id int, stateful bool) error {
statefulInt = 1
}
- _, err := exec(c.db, "UPDATE containers SET stateful=? WHERE id=?", statefulInt, id)
+ err := exec(c.db, "UPDATE containers SET stateful=? WHERE id=?", statefulInt, id)
return err
}
@@ -648,7 +648,7 @@ func (c *Cluster) ContainersList(cType ContainerType) ([]string, error) {
func (c *Cluster) ContainersResetState() error {
// Reset all container states
- _, err := exec(c.db, "DELETE FROM containers_config WHERE key='volatile.last_state.power'")
+ err := exec(c.db, "DELETE FROM containers_config WHERE key='volatile.last_state.power'")
return err
}
@@ -739,7 +739,7 @@ func ContainerUpdate(tx *sql.Tx, id int, description string, architecture int, e
func (c *Cluster) ContainerLastUsedUpdate(id int, date time.Time) error {
stmt := `UPDATE containers SET last_use_date=? WHERE id=?`
- _, err := exec(c.db, stmt, date, id)
+ err := exec(c.db, stmt, date, id)
return err
}
diff --git a/lxd/db/db.go b/lxd/db/db.go
index f6e7c0d0f..bf550d8bb 100644
--- a/lxd/db/db.go
+++ b/lxd/db/db.go
@@ -361,117 +361,85 @@ func begin(db *sql.DB) (*sql.Tx, error) {
}
func TxCommit(tx *sql.Tx) error {
- for i := 0; i < 1000; i++ {
- err := tx.Commit()
- if err == nil || err == sql.ErrTxDone { // Ignore duplicate commits/rollbacks
- return nil
- }
- if !query.IsRetriableError(err) {
- logger.Debugf("Txcommit: error %q", err)
- return err
- }
- time.Sleep(30 * time.Millisecond)
+ err := tx.Commit()
+ if err == nil || err == sql.ErrTxDone { // Ignore duplicate commits/rollbacks
+ return nil
}
-
- logger.Debugf("Txcommit: db still locked")
- logger.Debugf(logger.GetStack())
- return fmt.Errorf("DB is locked")
+ return err
}
func dbQueryRowScan(db *sql.DB, q string, args []interface{}, outargs []interface{}) error {
- for i := 0; i < 1000; i++ {
- err := db.QueryRow(q, args...).Scan(outargs...)
- if err == nil {
- return nil
- }
- if isNoMatchError(err) {
- return err
- }
- if !query.IsRetriableError(err) {
- return err
- }
- time.Sleep(30 * time.Millisecond)
- }
-
- logger.Debugf("DbQueryRowScan: query %q args %q, DB still locked", q, args)
- logger.Debugf(logger.GetStack())
- return fmt.Errorf("DB is locked")
-}
-
-func dbQuery(db *sql.DB, q string, args ...interface{}) (*sql.Rows, error) {
- for i := 0; i < 1000; i++ {
- result, err := db.Query(q, args...)
- if err == nil {
- return result, nil
- }
- if !query.IsRetriableError(err) {
- logger.Debugf("DbQuery: query %q error %q", q, err)
- return nil, err
- }
- time.Sleep(30 * time.Millisecond)
- }
-
- logger.Debugf("DbQuery: query %q args %q, DB still locked", q, args)
- logger.Debugf(logger.GetStack())
- return nil, fmt.Errorf("DB is locked")
+ return query.Retry(func() error {
+ return query.Transaction(db, func(tx *sql.Tx) error {
+ return tx.QueryRow(q, args...).Scan(outargs...)
+ })
+ })
}
-func doDbQueryScan(qi queryer, q string, args []interface{}, outargs []interface{}) ([][]interface{}, error) {
- rows, err := qi.Query(q, args...)
- if err != nil {
- return [][]interface{}{}, err
- }
- defer rows.Close()
+func doDbQueryScan(db *sql.DB, q string, args []interface{}, outargs []interface{}) ([][]interface{}, error) {
result := [][]interface{}{}
- for rows.Next() {
- ptrargs := make([]interface{}, len(outargs))
- for i := range outargs {
- switch t := outargs[i].(type) {
- case string:
- str := ""
- ptrargs[i] = &str
- case int:
- integer := 0
- ptrargs[i] = &integer
- case int64:
- integer := int64(0)
- ptrargs[i] = &integer
- default:
- return [][]interface{}{}, fmt.Errorf("Bad interface type: %s", t)
+
+ err := query.Retry(func() error {
+ return query.Transaction(db, func(tx *sql.Tx) error {
+ rows, err := tx.Query(q, args...)
+ if err != nil {
+ return err
}
- }
- err = rows.Scan(ptrargs...)
- if err != nil {
- return [][]interface{}{}, err
- }
- newargs := make([]interface{}, len(outargs))
- for i := range ptrargs {
- switch t := outargs[i].(type) {
- case string:
- newargs[i] = *ptrargs[i].(*string)
- case int:
- newargs[i] = *ptrargs[i].(*int)
- case int64:
- newargs[i] = *ptrargs[i].(*int64)
- default:
- return [][]interface{}{}, fmt.Errorf("Bad interface type: %s", t)
+ defer rows.Close()
+
+ for rows.Next() {
+ ptrargs := make([]interface{}, len(outargs))
+ for i := range outargs {
+ switch t := outargs[i].(type) {
+ case string:
+ str := ""
+ ptrargs[i] = &str
+ case int:
+ integer := 0
+ ptrargs[i] = &integer
+ case int64:
+ integer := int64(0)
+ ptrargs[i] = &integer
+ default:
+ return fmt.Errorf("Bad interface type: %s", t)
+ }
+ }
+ err = rows.Scan(ptrargs...)
+ if err != nil {
+ return err
+ }
+ newargs := make([]interface{}, len(outargs))
+ for i := range ptrargs {
+ switch t := outargs[i].(type) {
+ case string:
+ newargs[i] = *ptrargs[i].(*string)
+ case int:
+ newargs[i] = *ptrargs[i].(*int)
+ case int64:
+ newargs[i] = *ptrargs[i].(*int64)
+ default:
+ return fmt.Errorf("Bad interface type: %s", t)
+ }
+ }
+ result = append(result, newargs)
}
- }
- result = append(result, newargs)
- }
- err = rows.Err()
+ err = rows.Err()
+ if err != nil {
+ return err
+ }
+ return nil
+ })
+ })
if err != nil {
return [][]interface{}{}, err
}
+
return result, nil
-}
-type queryer interface {
- Query(query string, args ...interface{}) (*sql.Rows, error)
}
/*
- * . qi anything implementing the querier interface (i.e. either sql.DB or sql.Tx)
+ * . db a reference to a sql.DB instance
* . q is the database query
* . inargs is an array of interfaces containing the query arguments
* . outfmt is an array of interfaces containing the right types of output
@@ -483,38 +451,16 @@ type queryer interface {
* The result will be an array (one per output row) of arrays (one per output argument)
* of interfaces, containing pointers to the actual output arguments.
*/
-func queryScan(qi queryer, q string, inargs []interface{}, outfmt []interface{}) ([][]interface{}, error) {
- for i := 0; i < 1000; i++ {
- result, err := doDbQueryScan(qi, q, inargs, outfmt)
- if err == nil {
- return result, nil
- }
- if !query.IsRetriableError(err) {
- logger.Debugf("DbQuery: query %q error %q", q, err)
- return nil, err
- }
- time.Sleep(30 * time.Millisecond)
- }
-
- logger.Debugf("DbQueryscan: query %q inargs %q, DB still locked", q, inargs)
- logger.Debugf(logger.GetStack())
- return nil, fmt.Errorf("DB is locked")
+func queryScan(db *sql.DB, q string, inargs []interface{}, outfmt []interface{}) ([][]interface{}, error) {
+ return doDbQueryScan(db, q, inargs, outfmt)
}
-func exec(db *sql.DB, q string, args ...interface{}) (sql.Result, error) {
- for i := 0; i < 1000; i++ {
- result, err := db.Exec(q, args...)
- if err == nil {
- return result, nil
- }
- if !query.IsRetriableError(err) {
- logger.Debugf("DbExec: query %q error %q", q, err)
- return nil, err
- }
- time.Sleep(30 * time.Millisecond)
- }
-
- logger.Debugf("DbExec: query %q args %q, DB still locked", q, args)
- logger.Debugf(logger.GetStack())
- return nil, fmt.Errorf("DB is locked")
+func exec(db *sql.DB, q string, args ...interface{}) error {
+ err := query.Retry(func() error {
+ return query.Transaction(db, func(tx *sql.Tx) error {
+ _, err := tx.Exec(q, args...)
+ return err
+ })
+ })
+ return err
}
diff --git a/lxd/db/images.go b/lxd/db/images.go
index 6fe2e7e43..d19da26d3 100644
--- a/lxd/db/images.go
+++ b/lxd/db/images.go
@@ -92,7 +92,7 @@ func (c *Cluster) ImageSourceInsert(imageId int, server string, protocol string,
return fmt.Errorf("Invalid protocol: %s", protocol)
}
- _, err := exec(c.db, stmt, imageId, server, protocolInt, certificate, alias)
+ err := exec(c.db, stmt, imageId, server, protocolInt, certificate, alias)
return err
}
@@ -375,7 +375,7 @@ func (c *Cluster) ImageAssociateNode(fingerprint string) error {
}
func (c *Cluster) ImageDelete(id int) error {
- _, err := exec(c.db, "DELETE FROM images WHERE id=?", id)
+ err := exec(c.db, "DELETE FROM images WHERE id=?", id)
if err != nil {
return err
}
@@ -432,42 +432,42 @@ func (c *Cluster) ImageAliasGet(name string, isTrustedClient bool) (int, api.Ima
}
func (c *Cluster) ImageAliasRename(id int, name string) error {
- _, err := exec(c.db, "UPDATE images_aliases SET name=? WHERE id=?", name, id)
+ err := exec(c.db, "UPDATE images_aliases SET name=? WHERE id=?", name, id)
return err
}
func (c *Cluster) ImageAliasDelete(name string) error {
- _, err := exec(c.db, "DELETE FROM images_aliases WHERE name=?", name)
+ err := exec(c.db, "DELETE FROM images_aliases WHERE name=?", name)
return err
}
func (c *Cluster) ImageAliasesMove(source int, destination int) error {
- _, err := exec(c.db, "UPDATE images_aliases SET image_id=? WHERE image_id=?", destination, source)
+ err := exec(c.db, "UPDATE images_aliases SET image_id=? WHERE image_id=?", destination, source)
return err
}
// Insert an alias ento the database.
func (c *Cluster) ImageAliasAdd(name string, imageID int, desc string) error {
stmt := `INSERT INTO images_aliases (name, image_id, description) values (?, ?, ?)`
- _, err := exec(c.db, stmt, name, imageID, desc)
+ err := exec(c.db, stmt, name, imageID, desc)
return err
}
func (c *Cluster) ImageAliasUpdate(id int, imageID int, desc string) error {
stmt := `UPDATE images_aliases SET image_id=?, description=? WHERE id=?`
- _, err := exec(c.db, stmt, imageID, desc, id)
+ err := exec(c.db, stmt, imageID, desc, id)
return err
}
func (c *Cluster) ImageLastAccessUpdate(fingerprint string, date time.Time) error {
stmt := `UPDATE images SET last_use_date=? WHERE fingerprint=?`
- _, err := exec(c.db, stmt, date, fingerprint)
+ err := exec(c.db, stmt, date, fingerprint)
return err
}
func (c *Cluster) ImageLastAccessInit(fingerprint string) error {
stmt := `UPDATE images SET cached=1, last_use_date=strftime("%s") WHERE fingerprint=?`
- _, err := exec(c.db, stmt, fingerprint)
+ err := exec(c.db, stmt, fingerprint)
return err
}
@@ -652,6 +652,6 @@ func (c *Cluster) ImageGetPoolNamesFromIDs(poolIDs []int64) ([]string, error) {
// ImageUploadedAt updates the upload_date column and an image row.
func (c *Cluster) ImageUploadedAt(id int, uploadedAt time.Time) error {
- _, err := exec(c.db, "UPDATE images SET upload_date=? WHERE id=?", uploadedAt, id)
+ err := exec(c.db, "UPDATE images SET upload_date=? WHERE id=?", uploadedAt, id)
return err
}
diff --git a/lxd/db/migration.go b/lxd/db/migration.go
index 51ef9fa2e..d2116ceb9 100644
--- a/lxd/db/migration.go
+++ b/lxd/db/migration.go
@@ -95,6 +95,7 @@ func (c *Cluster) ImportPreClusteringData(dump *Dump) error {
// gets created no matter what.
_, err = tx.Exec("DELETE FROM profiles WHERE id=1")
if err != nil {
+ tx.Rollback()
return errors.Wrap(err, "failed to delete default profile")
}
@@ -190,13 +191,16 @@ func (c *Cluster) ImportPreClusteringData(dump *Dump) error {
stmt += fmt.Sprintf(" VALUES %s", query.Params(len(columns)))
result, err := tx.Exec(stmt, row...)
if err != nil {
+ tx.Rollback()
return errors.Wrapf(err, "failed to insert row %d into %s", i, table)
}
n, err := result.RowsAffected()
if err != nil {
+ tx.Rollback()
return errors.Wrapf(err, "no result count for row %d of %s", i, table)
}
if n != 1 {
+ tx.Rollback()
return fmt.Errorf("could not insert %d int %s", i, table)
}
diff --git a/lxd/db/networks.go b/lxd/db/networks.go
index 2a97fd03e..215f21101 100644
--- a/lxd/db/networks.go
+++ b/lxd/db/networks.go
@@ -556,7 +556,7 @@ func (c *Cluster) NetworkDelete(name string) error {
return err
}
- _, err = exec(c.db, "DELETE FROM networks WHERE id=?", id)
+ err = exec(c.db, "DELETE FROM networks WHERE id=?", id)
if err != nil {
return err
}
diff --git a/lxd/db/profiles.go b/lxd/db/profiles.go
index 8c95e4a0b..d36f93df7 100644
--- a/lxd/db/profiles.go
+++ b/lxd/db/profiles.go
@@ -165,7 +165,7 @@ func (c *Cluster) ProfileDelete(name string) error {
return err
}
- _, err = exec(c.db, "DELETE FROM profiles WHERE id=?", id)
+ err = exec(c.db, "DELETE FROM profiles WHERE id=?", id)
if err != nil {
return err
}
@@ -263,7 +263,7 @@ DELETE FROM profiles_config WHERE profile_id NOT IN (SELECT id FROM profiles);
DELETE FROM profiles_devices WHERE profile_id NOT IN (SELECT id FROM profiles);
DELETE FROM profiles_devices_config WHERE profile_device_id NOT IN (SELECT id FROM profiles_devices);
`
- _, err := c.db.Exec(stmt)
+ err := exec(c.db, stmt)
if err != nil {
return err
}
diff --git a/lxd/db/query/retry.go b/lxd/db/query/retry.go
index c2f9676a6..8b3eef57c 100644
--- a/lxd/db/query/retry.go
+++ b/lxd/db/query/retry.go
@@ -19,7 +19,7 @@ func Retry(f func() error) error {
for i := 0; i < 20; i++ {
err = f()
if err != nil {
- logger.Debugf("Database error %#v", err)
+ logger.Debugf("Database error: %#v", err)
if IsRetriableError(err) {
logger.Debugf("Retry failed db interaction (%v)", err)
diff --git a/lxd/db/storage_pools.go b/lxd/db/storage_pools.go
index 8b2da0904..050157d01 100644
--- a/lxd/db/storage_pools.go
+++ b/lxd/db/storage_pools.go
@@ -659,7 +659,7 @@ func (c *Cluster) StoragePoolDelete(poolName string) (*api.StoragePool, error) {
return nil, err
}
- _, err = exec(c.db, "DELETE FROM storage_pools WHERE id=?", poolID)
+ err = exec(c.db, "DELETE FROM storage_pools WHERE id=?", poolID)
if err != nil {
return nil, err
}
@@ -842,6 +842,7 @@ func (c *Cluster) StoragePoolVolumeUpdate(volumeName string, volumeType int, poo
return StorageVolumeDescriptionUpdate(tx, volumeID, volumeDescription)
})
if err != nil {
+ tx.Rollback()
return err
}
@@ -1051,6 +1052,6 @@ func StoragePoolVolumeTypeToName(volumeType int) (string, error) {
}
func (c *Cluster) StoragePoolInsertZfsDriver() error {
- _, err := exec(c.db, "UPDATE storage_pools SET driver='zfs', description='' WHERE driver=''")
+ err := exec(c.db, "UPDATE storage_pools SET driver='zfs', description='' WHERE driver=''")
return err
}
diff --git a/lxd/db/storage_volumes.go b/lxd/db/storage_volumes.go
index 4df68a963..151ad7faf 100644
--- a/lxd/db/storage_volumes.go
+++ b/lxd/db/storage_volumes.go
@@ -178,17 +178,17 @@ func (c *Cluster) StorageVolumeCleanupImages(fingerprints []string) error {
for _, fingerprint := range fingerprints {
args = append(args, fingerprint)
}
- _, err := exec(c.db, stmt, args...)
+ err := exec(c.db, stmt, args...)
return err
}
func (c *Cluster) StorageVolumeMoveToLVMThinPoolNameKey() error {
- _, err := exec(c.db, "UPDATE storage_pools_config SET key='lvm.thinpool_name' WHERE key='volume.lvm.thinpool_name';")
+ err := exec(c.db, "UPDATE storage_pools_config SET key='lvm.thinpool_name' WHERE key='volume.lvm.thinpool_name';")
if err != nil {
return err
}
- _, err = exec(c.db, "DELETE FROM storage_volumes_config WHERE key='lvm.thinpool_name';")
+ err = exec(c.db, "DELETE FROM storage_volumes_config WHERE key='lvm.thinpool_name';")
if err != nil {
return err
}
More information about the lxc-devel
mailing list