[lxc-devel] [lxd/master] Wire new dqlite implementation

freeekanayaka on Github lxc-bot at linuxcontainers.org
Mon Jul 30 20:49:33 UTC 2018


A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 470 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20180730/228f3d31/attachment.bin>
-------------- next part --------------
From ca98f334ecaf4104609a1f5980ca92207857d05e Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Thu, 12 Jul 2018 08:33:26 +0000
Subject: [PATCH 1/7] Use mattn's sqlite3 bindings in the lxd/db sub package

Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
 lxd/db/query/retry.go   | 2 +-
 lxd/db/schema/update.go | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/lxd/db/query/retry.go b/lxd/db/query/retry.go
index 65ef4636d..0c78a8c7f 100644
--- a/lxd/db/query/retry.go
+++ b/lxd/db/query/retry.go
@@ -4,8 +4,8 @@ import (
 	"strings"
 	"time"
 
-	"github.com/CanonicalLtd/go-sqlite3"
 	"github.com/lxc/lxd/shared/logger"
+	"github.com/mattn/go-sqlite3"
 	"github.com/pkg/errors"
 )
 
diff --git a/lxd/db/schema/update.go b/lxd/db/schema/update.go
index 121cd1c73..3a9da889d 100644
--- a/lxd/db/schema/update.go
+++ b/lxd/db/schema/update.go
@@ -7,7 +7,7 @@ import (
 	"path"
 	"runtime"
 
-	_ "github.com/CanonicalLtd/go-sqlite3" // For opening the in-memory database
+	_ "github.com/mattn/go-sqlite3" // For opening the in-memory database
 )
 
 // DotGo writes '<name>.go' source file in the package of the calling function, containing

From e0b63cbeb48f15cb1b4c9619e74a6ba8e2698b14 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Thu, 12 Jul 2018 09:06:41 +0000
Subject: [PATCH 2/7] Drop go-1.6 code

Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
 lxd/db/query/slices.go          | 19 +++++++++++++++++++
 lxd/db/query/slices_1_6.go      | 15 ---------------
 lxd/db/query/slices_1_6_test.go | 18 ------------------
 lxd/db/query/slices_1_8.go      | 26 --------------------------
 lxd/db/query/slices_1_8_test.go | 21 ---------------------
 lxd/db/query/slices_test.go     | 18 ++++++++++++++++++
 6 files changed, 37 insertions(+), 80 deletions(-)
 delete mode 100644 lxd/db/query/slices_1_6.go
 delete mode 100644 lxd/db/query/slices_1_6_test.go
 delete mode 100644 lxd/db/query/slices_1_8.go
 delete mode 100644 lxd/db/query/slices_1_8_test.go

diff --git a/lxd/db/query/slices.go b/lxd/db/query/slices.go
index 6cd9a7934..c86deca42 100644
--- a/lxd/db/query/slices.go
+++ b/lxd/db/query/slices.go
@@ -104,3 +104,22 @@ func scanSingleColumn(tx *sql.Tx, query string, args []interface{}, typeName str
 
 // Function to scan a single row.
 type scanFunc func(*sql.Rows) error
+
+// Check that the given result set yields rows with a single column of a
+// specific type.
+func checkRowsHaveOneColumnOfSpecificType(rows *sql.Rows, typeName string) error {
+	types, err := rows.ColumnTypes()
+	if err != nil {
+		return err
+	}
+	if len(types) != 1 {
+		return fmt.Errorf("query yields %d columns, not 1", len(types))
+	}
+
+	actualTypeName := strings.ToUpper(types[0].DatabaseTypeName())
+	if actualTypeName != typeName {
+		return fmt.Errorf("query yields %s column, not %s", actualTypeName, typeName)
+	}
+
+	return nil
+}
diff --git a/lxd/db/query/slices_1_6.go b/lxd/db/query/slices_1_6.go
deleted file mode 100644
index 652147b70..000000000
--- a/lxd/db/query/slices_1_6.go
+++ /dev/null
@@ -1,15 +0,0 @@
-// +build !go1.8
-
-package query
-
-import "database/sql"
-
-// Check that the given result set yields rows with a single column of a
-// specific type.
-func checkRowsHaveOneColumnOfSpecificType(rows *sql.Rows, typeName string) error {
-	// The Rows.ColumnTypes() method is available only since Go 1.8, so we
-	// just return nil for <1.8. This is safe to do since if the returned
-	// rows are not of the expected type, call sites will still fail at
-	// Rows.Scan() time, although the error message will be less clear.
-	return nil
-}
diff --git a/lxd/db/query/slices_1_6_test.go b/lxd/db/query/slices_1_6_test.go
deleted file mode 100644
index d508edecf..000000000
--- a/lxd/db/query/slices_1_6_test.go
+++ /dev/null
@@ -1,18 +0,0 @@
-// +build !go1.8
-
-package query_test
-
-var testStringsErrorCases = []struct {
-	query string
-	error string
-}{
-	{"garbage", "near \"garbage\": syntax error"},
-	{"SELECT id, name FROM test", "sql: expected 2 destination arguments in Scan, not 1"},
-}
-
-var testIntegersErrorCases = []struct {
-	query string
-	error string
-}{
-	{"garbage", "near \"garbage\": syntax error"},
-}
diff --git a/lxd/db/query/slices_1_8.go b/lxd/db/query/slices_1_8.go
deleted file mode 100644
index 140de7c2a..000000000
--- a/lxd/db/query/slices_1_8.go
+++ /dev/null
@@ -1,26 +0,0 @@
-// +build go1.8
-
-package query
-
-import (
-	"database/sql"
-	"fmt"
-	"strings"
-)
-
-// Check that the given result set yields rows with a single column of a
-// specific type.
-func checkRowsHaveOneColumnOfSpecificType(rows *sql.Rows, typeName string) error {
-	types, err := rows.ColumnTypes()
-	if err != nil {
-		return err
-	}
-	if len(types) != 1 {
-		return fmt.Errorf("query yields %d columns, not 1", len(types))
-	}
-	actualTypeName := strings.ToUpper(types[0].DatabaseTypeName())
-	if actualTypeName != typeName {
-		return fmt.Errorf("query yields %s column, not %s", actualTypeName, typeName)
-	}
-	return nil
-}
diff --git a/lxd/db/query/slices_1_8_test.go b/lxd/db/query/slices_1_8_test.go
deleted file mode 100644
index dae33c2ab..000000000
--- a/lxd/db/query/slices_1_8_test.go
+++ /dev/null
@@ -1,21 +0,0 @@
-// +build go1.8
-
-package query_test
-
-var testStringsErrorCases = []struct {
-	query string
-	error string
-}{
-	{"garbage", "near \"garbage\": syntax error"},
-	{"SELECT id, name FROM test", "query yields 2 columns, not 1"},
-	{"SELECT id FROM test", "query yields INTEGER column, not TEXT"},
-}
-
-var testIntegersErrorCases = []struct {
-	query string
-	error string
-}{
-	{"garbage", "near \"garbage\": syntax error"},
-	{"SELECT id, name FROM test", "query yields 2 columns, not 1"},
-	{"SELECT name FROM test", "query yields TEXT column, not INTEGER"},
-}
diff --git a/lxd/db/query/slices_test.go b/lxd/db/query/slices_test.go
index 2b4a1cce2..3e028528b 100644
--- a/lxd/db/query/slices_test.go
+++ b/lxd/db/query/slices_test.go
@@ -23,6 +23,15 @@ func TestStrings_Error(t *testing.T) {
 	}
 }
 
+var testStringsErrorCases = []struct {
+	query string
+	error string
+}{
+	{"garbage", "near \"garbage\": syntax error"},
+	{"SELECT id, name FROM test", "query yields 2 columns, not 1"},
+	{"SELECT id FROM test", "query yields INTEGER column, not TEXT"},
+}
+
 // All values yield by the query are returned.
 func TestStrings(t *testing.T) {
 	tx := newTxForSlices(t)
@@ -43,6 +52,15 @@ func TestIntegers_Error(t *testing.T) {
 	}
 }
 
+var testIntegersErrorCases = []struct {
+	query string
+	error string
+}{
+	{"garbage", "near \"garbage\": syntax error"},
+	{"SELECT id, name FROM test", "query yields 2 columns, not 1"},
+	{"SELECT name FROM test", "query yields TEXT column, not INTEGER"},
+}
+
 // All values yield by the query are returned.
 func TestIntegers(t *testing.T) {
 	tx := newTxForSlices(t)

From 5e7425ff80ed064b6783c6b8c8aa4cb52be5db95 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Thu, 12 Jul 2018 12:10:59 +0000
Subject: [PATCH 3/7] Replace grpc-sql with dqlite custom protocol

Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
 lxd/db/cluster/open.go      | 28 ++++++++------
 lxd/db/db.go                |  9 +++--
 lxd/db/migration.go         |  2 +-
 lxd/db/node/sqlite.go       |  2 +-
 lxd/db/query/slices.go      | 24 ------------
 lxd/db/query/slices_test.go |  6 +--
 lxd/db/schema/schema.go     |  3 ++
 lxd/db/testing.go           | 90 +++++++++++++++++++++++++++++++--------------
 8 files changed, 92 insertions(+), 72 deletions(-)

diff --git a/lxd/db/cluster/open.go b/lxd/db/cluster/open.go
index 3cb7c5c3e..20e456e20 100644
--- a/lxd/db/cluster/open.go
+++ b/lxd/db/cluster/open.go
@@ -6,7 +6,7 @@ import (
 	"path/filepath"
 	"sync/atomic"
 
-	"github.com/CanonicalLtd/go-grpc-sql"
+	"github.com/CanonicalLtd/go-dqlite"
 	"github.com/lxc/lxd/lxd/db/query"
 	"github.com/lxc/lxd/lxd/db/schema"
 	"github.com/lxc/lxd/lxd/util"
@@ -23,18 +23,22 @@ import (
 //
 // The dialer argument is a function that returns a gRPC dialer that can be
 // used to connect to a database node using the gRPC SQL package.
-func Open(name string, dialer grpcsql.Dialer) (*sql.DB, error) {
-	driver := grpcsql.NewDriver(dialer)
-	driverName := grpcSQLDriverName()
+func Open(name string, store dqlite.ServerStore, options ...dqlite.DriverOption) (*sql.DB, error) {
+	driver, err := dqlite.NewDriver(store, options...)
+	if err != nil {
+		return nil, errors.Wrap(err, "failed to create dqlite driver")
+	}
+
+	driverName := dqliteDriverName()
 	sql.Register(driverName, driver)
 
-	// Create the cluster db. This won't immediately establish any gRPC
+	// Create the cluster db. This won't immediately establish any network
 	// connection, that will happen only when a db transaction is started
 	// (see the database/sql connection pooling code for more details).
 	if name == "" {
 		name = "db.bin"
 	}
-	db, err := sql.Open(driverName, name+"?_foreign_keys=1")
+	db, err := sql.Open(driverName, name)
 	if err != nil {
 		return nil, fmt.Errorf("cannot open cluster database: %v", err)
 	}
@@ -191,18 +195,18 @@ INSERT INTO profiles (name, description) VALUES ('default', 'Default LXD profile
 	return true, err
 }
 
-// Generate a new name for the grpcsql driver registration. We need it to be
+// Generate a new name for the dqlite driver registration. We need it to be
 // unique for testing, see below.
-func grpcSQLDriverName() string {
-	defer atomic.AddUint64(&grpcSQLDriverSerial, 1)
-	return fmt.Sprintf("grpc-%d", grpcSQLDriverSerial)
+func dqliteDriverName() string {
+	defer atomic.AddUint64(&dqliteDriverSerial, 1)
+	return fmt.Sprintf("dqlite-%d", dqliteDriverSerial)
 }
 
-// Monotonic serial number for registering new instances of grpcsql.Driver
+// Monotonic serial number for registering new instances of dqlite.Driver
 // using the database/sql stdlib package. This is needed since there's no way
 // to unregister drivers, and in unit tests more than one driver gets
 // registered.
-var grpcSQLDriverSerial uint64
+var dqliteDriverSerial uint64
 
 func checkClusterIsUpgradable(tx *sql.Tx, target [2]int) error {
 	// Get the current versions in the nodes table.
diff --git a/lxd/db/db.go b/lxd/db/db.go
index c85ec9c55..c439ec19a 100644
--- a/lxd/db/db.go
+++ b/lxd/db/db.go
@@ -6,7 +6,7 @@ import (
 	"sync"
 	"time"
 
-	"github.com/CanonicalLtd/go-grpc-sql"
+	"github.com/CanonicalLtd/go-dqlite"
 	"github.com/pkg/errors"
 	"golang.org/x/net/context"
 
@@ -59,6 +59,9 @@ func OpenNode(dir string, fresh func(*Node) error, legacyPatches map[int]*Legacy
 		return nil, nil, err
 	}
 
+	db.SetMaxOpenConns(1)
+	db.SetMaxIdleConns(1)
+
 	legacyHook := legacyPatchHook(db, legacyPatches)
 	hook := func(version int, tx *sql.Tx) error {
 		if version == node.UpdateFromPreClustering {
@@ -155,8 +158,8 @@ type Cluster struct {
 // database matches our version, and possibly trigger a schema update. If the
 // schema update can't be performed right now, because some nodes are still
 // behind, an Upgrading error is returned.
-func OpenCluster(name string, dialer grpcsql.Dialer, address, dir string) (*Cluster, error) {
-	db, err := cluster.Open(name, dialer)
+func OpenCluster(name string, store dqlite.ServerStore, address, dir string, options ...dqlite.DriverOption) (*Cluster, error) {
+	db, err := cluster.Open(name, store, options...)
 	if err != nil {
 		return nil, errors.Wrap(err, "failed to open database")
 	}
diff --git a/lxd/db/migration.go b/lxd/db/migration.go
index 1f06db1bd..ee769f53c 100644
--- a/lxd/db/migration.go
+++ b/lxd/db/migration.go
@@ -230,7 +230,7 @@ func importNodeAssociation(entity string, columns []string, row []interface{}, t
 	if id == 0 {
 		return fmt.Errorf("entity %s has invalid ID", entity)
 	}
-	_, err := tx.Exec(stmt, row...)
+	_, err := tx.Exec(stmt, id)
 	if err != nil {
 		return errors.Wrapf(err, "failed to associate %s to node", entity)
 	}
diff --git a/lxd/db/node/sqlite.go b/lxd/db/node/sqlite.go
index 235b8a776..eecef80ab 100644
--- a/lxd/db/node/sqlite.go
+++ b/lxd/db/node/sqlite.go
@@ -4,7 +4,7 @@ import (
 	"database/sql"
 	"fmt"
 
-	"github.com/CanonicalLtd/go-sqlite3"
+	"github.com/mattn/go-sqlite3"
 )
 
 func init() {
diff --git a/lxd/db/query/slices.go b/lxd/db/query/slices.go
index c86deca42..b62335203 100644
--- a/lxd/db/query/slices.go
+++ b/lxd/db/query/slices.go
@@ -83,11 +83,6 @@ func scanSingleColumn(tx *sql.Tx, query string, args []interface{}, typeName str
 	}
 	defer rows.Close()
 
-	err = checkRowsHaveOneColumnOfSpecificType(rows, typeName)
-	if err != nil {
-		return err
-	}
-
 	for rows.Next() {
 		err := scan(rows)
 		if err != nil {
@@ -104,22 +99,3 @@ func scanSingleColumn(tx *sql.Tx, query string, args []interface{}, typeName str
 
 // Function to scan a single row.
 type scanFunc func(*sql.Rows) error
-
-// Check that the given result set yields rows with a single column of a
-// specific type.
-func checkRowsHaveOneColumnOfSpecificType(rows *sql.Rows, typeName string) error {
-	types, err := rows.ColumnTypes()
-	if err != nil {
-		return err
-	}
-	if len(types) != 1 {
-		return fmt.Errorf("query yields %d columns, not 1", len(types))
-	}
-
-	actualTypeName := strings.ToUpper(types[0].DatabaseTypeName())
-	if actualTypeName != typeName {
-		return fmt.Errorf("query yields %s column, not %s", actualTypeName, typeName)
-	}
-
-	return nil
-}
diff --git a/lxd/db/query/slices_test.go b/lxd/db/query/slices_test.go
index 3e028528b..6a37d8ba3 100644
--- a/lxd/db/query/slices_test.go
+++ b/lxd/db/query/slices_test.go
@@ -28,8 +28,7 @@ var testStringsErrorCases = []struct {
 	error string
 }{
 	{"garbage", "near \"garbage\": syntax error"},
-	{"SELECT id, name FROM test", "query yields 2 columns, not 1"},
-	{"SELECT id FROM test", "query yields INTEGER column, not TEXT"},
+	{"SELECT id, name FROM test", "sql: expected 2 destination arguments in Scan, not 1"},
 }
 
 // All values yield by the query are returned.
@@ -57,8 +56,7 @@ var testIntegersErrorCases = []struct {
 	error string
 }{
 	{"garbage", "near \"garbage\": syntax error"},
-	{"SELECT id, name FROM test", "query yields 2 columns, not 1"},
-	{"SELECT name FROM test", "query yields TEXT column, not INTEGER"},
+	{"SELECT id, name FROM test", "sql: expected 2 destination arguments in Scan, not 1"},
 }
 
 // All values yield by the query are returned.
diff --git a/lxd/db/schema/schema.go b/lxd/db/schema/schema.go
index 040ab6d7a..e576c7887 100644
--- a/lxd/db/schema/schema.go
+++ b/lxd/db/schema/schema.go
@@ -148,6 +148,7 @@ func (s *Schema) Ensure(db *sql.DB) (int, error) {
 		if err != nil {
 			return err
 		}
+
 		current, err = queryCurrentVersion(tx)
 		if err != nil {
 			return err
@@ -165,6 +166,7 @@ func (s *Schema) Ensure(db *sql.DB) (int, error) {
 				return err
 			}
 		}
+
 		// When creating the schema from scratch, use the fresh dump if
 		// available. Otherwise just apply all relevant updates.
 		if current == 0 && s.fresh != "" {
@@ -315,6 +317,7 @@ func queryCurrentVersion(tx *sql.Tx) (int, error) {
 		}
 		current = versions[len(versions)-1] // Highest recorded version
 	}
+
 	return current, nil
 }
 
diff --git a/lxd/db/testing.go b/lxd/db/testing.go
index e2d77cb4d..18a59e303 100644
--- a/lxd/db/testing.go
+++ b/lxd/db/testing.go
@@ -1,17 +1,17 @@
 package db
 
 import (
+	"context"
+	"fmt"
 	"io/ioutil"
 	"net"
 	"os"
 	"testing"
-	"time"
 
-	"github.com/CanonicalLtd/go-grpc-sql"
-	"github.com/CanonicalLtd/go-sqlite3"
-	"github.com/lxc/lxd/lxd/util"
+	"github.com/CanonicalLtd/go-dqlite"
+	"github.com/CanonicalLtd/raft-test"
+	"github.com/hashicorp/raft"
 	"github.com/stretchr/testify/require"
-	"google.golang.org/grpc"
 )
 
 // NewTestNode creates a new Node for testing purposes, along with a function
@@ -53,15 +53,23 @@ func NewTestNodeTx(t *testing.T) (*NodeTx, func()) {
 // NewTestCluster creates a new Cluster for testing purposes, along with a function
 // that can be used to clean it up when done.
 func NewTestCluster(t *testing.T) (*Cluster, func()) {
-	// Create an in-memory gRPC SQL server and dialer.
-	server, dialer := newGrpcServer()
+	// Create an in-memory dqlite SQL server and associated store.
+	store, serverCleanup := newDqliteServer(t)
 
-	cluster, err := OpenCluster(":memory:", dialer, "1", "/unused/db/dir")
+	log := newLogFunc(t)
+
+	dial := func(ctx context.Context, address string) (net.Conn, error) {
+		return net.Dial("unix", address)
+	}
+
+	cluster, err := OpenCluster(
+		"test.db", store, "1", "/unused/db/dir",
+		dqlite.WithLogFunc(log), dqlite.WithDialFunc(dial))
 	require.NoError(t, err)
 
 	cleanup := func() {
 		require.NoError(t, cluster.Close())
-		server.Stop()
+		serverCleanup()
 	}
 
 	return cluster, cleanup
@@ -87,26 +95,54 @@ func NewTestClusterTx(t *testing.T) (*ClusterTx, func()) {
 	return clusterTx, cleanup
 }
 
-// Create a new in-memory gRPC server attached to a grpc-sql gateway backed by a
-// SQLite driver.
+// Create a new in-memory dqlite server.
 //
-// Return the newly created gRPC server and a dialer that can be used to
-// connect to it.
-func newGrpcServer() (*grpc.Server, grpcsql.Dialer) {
-	listener, dial := util.InMemoryNetwork()
-	server := grpcsql.NewServer(&sqlite3.SQLiteDriver{})
-
-	// Setup an in-memory gRPC dialer.
-	options := []grpc.DialOption{
-		grpc.WithInsecure(),
-		grpc.WithDialer(func(string, time.Duration) (net.Conn, error) {
-			return dial(), nil
-		}),
+// Return the newly created server store can be used to connect to it.
+func newDqliteServer(t *testing.T) (*dqlite.DatabaseServerStore, func()) {
+	t.Helper()
+
+	listener, err := net.Listen("unix", "")
+	require.NoError(t, err)
+
+	address := listener.Addr().String()
+
+	store, err := dqlite.DefaultServerStore(":memory:")
+	require.NoError(t, err)
+	require.NoError(t, store.Set(context.Background(), []dqlite.ServerInfo{{Address: address}}))
+
+	id := fmt.Sprintf("%d", dqliteSerial)
+	dqliteSerial++
+	registry := dqlite.NewRegistry(id)
+
+	fsm := dqlite.NewFSM(registry)
+
+	r, raftCleanup := rafttest.Server(t, fsm, rafttest.Transport(func(i int) raft.Transport {
+		require.Equal(t, i, 0)
+		address := raft.ServerAddress(listener.Addr().String())
+		_, transport := raft.NewInmemTransport(address)
+		return transport
+	}))
+
+	log := newLogFunc(t)
+
+	server, err := dqlite.NewServer(
+		r, registry, listener, dqlite.WithServerLogFunc(log))
+	require.NoError(t, err)
+
+	cleanup := func() {
+		require.NoError(t, server.Close())
+		raftCleanup()
 	}
-	dialer := func() (*grpc.ClientConn, error) {
-		return grpc.Dial("", options...)
+
+	return store, cleanup
+}
+
+var dqliteSerial = 0
+
+func newLogFunc(t *testing.T) dqlite.LogFunc {
+	return func(l dqlite.LogLevel, format string, a ...interface{}) {
+		format = fmt.Sprintf("%s: %s", l.String(), format)
+		t.Logf(format, a...)
 	}
 
-	go server.Serve(listener)
-	return server, dialer
 }

From cd246474eca2f10dc8fd81619a133a34755eed68 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Fri, 13 Jul 2018 12:19:34 +0000
Subject: [PATCH 4/7] Wire dqlite server

Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
 lxd/cluster/gateway.go         | 292 ++++++++++++++++++++++++++++-------------
 lxd/cluster/gateway_test.go    |  14 +-
 lxd/cluster/heartbeat.go       |   2 +-
 lxd/cluster/heartbeat_test.go  |   6 +-
 lxd/cluster/membership.go      |   5 +-
 lxd/cluster/membership_test.go |  36 +++--
 lxd/cluster/raft.go            |  10 +-
 lxd/cluster/raft_test.go       |   6 +-
 lxd/cluster/upgrade.go         |   2 +-
 9 files changed, 255 insertions(+), 118 deletions(-)

diff --git a/lxd/cluster/gateway.go b/lxd/cluster/gateway.go
index b37b6db1f..34b9b4f0c 100644
--- a/lxd/cluster/gateway.go
+++ b/lxd/cluster/gateway.go
@@ -1,7 +1,10 @@
 package cluster
 
 import (
+	"bufio"
+	"crypto/tls"
 	"fmt"
+	"io"
 	"net"
 	"net/http"
 	"net/url"
@@ -10,17 +13,15 @@ import (
 	"strconv"
 	"time"
 
-	"github.com/CanonicalLtd/dqlite"
-	"github.com/CanonicalLtd/go-grpc-sql"
+	"github.com/CanonicalLtd/go-dqlite"
 	"github.com/hashicorp/raft"
 	"github.com/lxc/lxd/lxd/db"
 	"github.com/lxc/lxd/lxd/util"
 	"github.com/lxc/lxd/shared"
+	"github.com/lxc/lxd/shared/eagain"
 	"github.com/lxc/lxd/shared/logger"
 	"github.com/pkg/errors"
 	"golang.org/x/net/context"
-	"google.golang.org/grpc"
-	"google.golang.org/grpc/credentials"
 )
 
 // NewGateway creates a new Gateway for managing access to the dqlite cluster.
@@ -49,6 +50,8 @@ func NewGateway(db *db.Node, cert *shared.CertInfo, options ...Option) (*Gateway
 		ctx:       ctx,
 		cancel:    cancel,
 		upgradeCh: make(chan struct{}, 16),
+		acceptCh:  make(chan net.Conn),
+		store:     &dqliteServerStore{},
 	}
 
 	err := gateway.init()
@@ -74,15 +77,16 @@ type Gateway struct {
 	// The gRPC server exposing the dqlite driver created by this
 	// gateway. It's nil if this LXD node is not supposed to be part of the
 	// raft cluster.
-	server *grpc.Server
+	server   *dqlite.Server
+	acceptCh chan net.Conn
 
-	// A dialer that will connect to the gRPC server using an in-memory
+	// A dialer that will connect to the dqlite server using a loopback
 	// net.Conn. It's non-nil when clustering is not enabled on this LXD
 	// node, and so we don't expose any dqlite or raft network endpoint,
 	// but still we want to use dqlite as backend for the "cluster"
 	// database, to minimize the difference between code paths in
 	// clustering and non-clustering modes.
-	memoryDial func() (*grpc.ClientConn, error)
+	memoryDial dqlite.DialFunc
 
 	// Used when shutting down the daemon to cancel any ongoing gRPC
 	// dialing attempt.
@@ -92,6 +96,9 @@ type Gateway struct {
 	// Used to unblock nodes that are waiting for other nodes to upgrade
 	// their version.
 	upgradeCh chan struct{}
+
+	// ServerStore wrapper.
+	store *dqliteServerStore
 }
 
 // HandlerFuncs returns the HTTP handlers that should be added to the REST API
@@ -104,7 +111,7 @@ type Gateway struct {
 // non-clustered node not available over the network or because it is not a
 // database node part of the dqlite cluster.
 func (g *Gateway) HandlerFuncs() map[string]http.HandlerFunc {
-	grpc := func(w http.ResponseWriter, r *http.Request) {
+	database := func(w http.ResponseWriter, r *http.Request) {
 		if !tlsCheckCert(r, g.cert) {
 			http.Error(w, "403 invalid client certificate", http.StatusForbidden)
 			return
@@ -166,7 +173,33 @@ func (g *Gateway) HandlerFuncs() map[string]http.HandlerFunc {
 			return
 		}
 
-		g.server.ServeHTTP(w, r)
+		if r.Header.Get("Upgrade") != "dqlite" {
+			http.Error(w, "missing or invalid upgrade header", http.StatusBadRequest)
+			return
+		}
+
+		hijacker, ok := w.(http.Hijacker)
+		if !ok {
+			http.Error(w, "webserver doesn't support hijacking", http.StatusInternalServerError)
+			return
+		}
+
+		conn, _, err := hijacker.Hijack()
+		if err != nil {
+			message := errors.Wrap(err, "failed to hijack connection").Error()
+			http.Error(w, message, http.StatusInternalServerError)
+			return
+		}
+
+		// Write the status line and upgrade header by hand since w.WriteHeader()
+		// would fail after Hijack()
+		data := []byte("HTTP/1.1 101 Switching Protocols\r\nUpgrade: dqlite\r\n\r\n")
+		if n, err := conn.Write(data); err != nil || n != len(data) {
+			conn.Close()
+			return
+		}
+
+		g.acceptCh <- conn
 	}
 	raft := func(w http.ResponseWriter, r *http.Request) {
 		// If we are not part of the raft cluster, reply with a
@@ -205,8 +238,8 @@ func (g *Gateway) HandlerFuncs() map[string]http.HandlerFunc {
 	}
 
 	return map[string]http.HandlerFunc{
-		grpcEndpoint: grpc,
-		raftEndpoint: raft,
+		databaseEndpoint: database,
+		raftEndpoint:     raft,
 	}
 }
 
@@ -222,47 +255,34 @@ func (g *Gateway) IsDatabaseNode() bool {
 	return g.raft != nil
 }
 
-// Dialer returns a gRPC dial function that can be used to connect to one of
-// the dqlite nodes via gRPC.
-func (g *Gateway) Dialer() grpcsql.Dialer {
-	return func() (*grpc.ClientConn, error) {
+// DialFunc returns a dial function that can be used to connect to one of the
+// dqlite nodes.
+func (g *Gateway) DialFunc() dqlite.DialFunc {
+	return func(ctx context.Context, address string) (net.Conn, error) {
 		// Memory connection.
 		if g.memoryDial != nil {
-			return g.memoryDial()
+			return g.memoryDial(ctx, address)
 		}
 
-		// TODO: should the timeout be configurable?
-		ctx, cancel := context.WithTimeout(g.ctx, 10*time.Second)
-		defer cancel()
-		var err error
-		for {
-			// Network connection.
-			addresses, dbErr := g.cachedRaftNodes()
-			if dbErr != nil {
-				return nil, dbErr
-			}
-
-			for _, address := range addresses {
-				var conn *grpc.ClientConn
-				conn, err = grpcNetworkDial(g.ctx, address, g.cert)
-				if err == nil {
-					return conn, nil
-				}
-				logger.Debugf("Failed to establish gRPC connection with %s: %v", address, err)
-			}
-			if ctx.Err() != nil {
-				return nil, ctx.Err()
-			}
-			select {
-			case <-time.After(250 * time.Millisecond):
-				continue
-			case <-ctx.Done():
-				return nil, ctx.Err()
-			}
-		}
+		return dqliteNetworkDial(ctx, address, g.cert)
 	}
 }
 
+// Context returns a cancellation context to pass to dqlite.NewDriver as
+// option.
+//
+// This context gets cancelled by Gateway.Kill() and at that point any
+// connection failure won't be retried.
+func (g *Gateway) Context() context.Context {
+	return g.ctx
+}
+
+// ServerStore returns a dqlite server store that can be used to lookup the
+// addresses of known database nodes.
+func (g *Gateway) ServerStore() dqlite.ServerStore {
+	return g.store
+}
+
 // Kill is an API that the daemon calls before it actually shuts down and calls
 // Shutdown(). It will abort any ongoing or new attempt to establish a SQL gRPC
 // connection with the dialer (typically for running some pre-shutdown
@@ -275,16 +295,32 @@ func (g *Gateway) Kill() {
 // Shutdown this gateway, stopping the gRPC server and possibly the raft factory.
 func (g *Gateway) Shutdown() error {
 	logger.Info("Stop database gateway")
+
+	if g.raft != nil {
+		err := g.raft.Shutdown()
+		if err != nil {
+			return errors.Wrap(err, "failed to shutdown raft")
+		}
+	}
+
 	if g.server != nil {
-		g.server.Stop()
+		// Dump the content of the database to disk, so the
+		// activateifneeded command can inspect it in order to decide
+		// whether to activate the daemon or not.
+		dir := filepath.Join(g.db.Dir(), "global")
+		err := g.server.Dump("db.bin", dir)
+		if err != nil {
+			// Just log a warning, since this is not fatal.
+			logger.Warnf("Failed to dump database to disk: %v", err)
+		}
+
+		g.server.Close()
 		// Unset the memory dial, since Shutdown() is also called for
 		// switching between in-memory and network mode.
 		g.memoryDial = nil
 	}
-	if g.raft == nil {
-		return nil
-	}
-	return g.raft.Shutdown()
+
+	return nil
 }
 
 // Reset the gateway, shutting it down and starting against from scratch using
@@ -363,7 +399,7 @@ func (g *Gateway) LeaderAddress() (string, error) {
 	}
 
 	for _, address := range addresses {
-		url := fmt.Sprintf("https://%s%s", address, grpcEndpoint)
+		url := fmt.Sprintf("https://%s%s", address, databaseEndpoint)
 		request, err := http.NewRequest("GET", url, nil)
 		if err != nil {
 			return "", err
@@ -409,20 +445,26 @@ func (g *Gateway) init() error {
 	// should serve as database node, so create a dqlite driver to be
 	// exposed it over gRPC.
 	if raft != nil {
-		config := dqlite.DriverConfig{}
-		driver, err := dqlite.NewDriver(raft.Registry(), raft.Raft(), config)
+		listener, err := net.Listen("unix", "")
 		if err != nil {
-			return errors.Wrap(err, "failed to create dqlite driver")
+			return errors.Wrap(err, "failed to allocate loopback port")
 		}
-		server := grpcsql.NewServer(driver)
+
 		if raft.HandlerFunc() == nil {
-			// If no raft http handler is set, it means we are in
-			// single node mode and we don't have a network
-			// endpoint, so let's spin up a fully in-memory gRPC
-			// server.
-			listener, dial := util.InMemoryNetwork()
-			go server.Serve(listener)
-			g.memoryDial = grpcMemoryDial(dial)
+			g.memoryDial = dqliteMemoryDial(listener)
+			g.store.inMemory = dqlite.NewInmemServerStore()
+			g.store.Set(context.Background(), []dqlite.ServerInfo{{Address: "0"}})
+		} else {
+			go runDqliteProxy(listener, g.acceptCh)
+			g.store.inMemory = nil
+		}
+
+		provider := &raftAddressProvider{db: g.db}
+		server, err := dqlite.NewServer(
+			raft.Raft(), raft.Registry(), listener,
+			dqlite.WithServerAddressProvider(provider))
+		if err != nil {
+			return errors.Wrap(err, "failed to create dqlite server")
 		}
 
 		g.server = server
@@ -430,7 +472,11 @@ func (g *Gateway) init() error {
 	} else {
 		g.server = nil
 		g.raft = nil
+		g.store.inMemory = nil
 	}
+
+	g.store.onDisk = dqlite.NewServerStore(g.db.DB(), "main", "raft_nodes", "address")
+
 	return nil
 }
 
@@ -500,21 +546,15 @@ func (g *Gateway) cachedRaftNodes() ([]string, error) {
 	return addresses, nil
 }
 
-func grpcNetworkDial(ctx context.Context, addr string, cert *shared.CertInfo) (*grpc.ClientConn, error) {
+func dqliteNetworkDial(ctx context.Context, addr string, cert *shared.CertInfo) (net.Conn, error) {
 	config, err := tlsClientConfig(cert)
 	if err != nil {
 		return nil, err
 	}
 
-	// The whole attempt should not take more than a few seconds. If the
-	// context gets cancelled, calling code will typically try against
-	// another database node, in round robin.
-	ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
-	defer cancel()
-
 	// Make a probe HEAD request to check if the target node is the leader.
-	url := fmt.Sprintf("https://%s%s", addr, grpcEndpoint)
-	request, err := http.NewRequest("HEAD", url, nil)
+	path := fmt.Sprintf("https://%s%s", addr, databaseEndpoint)
+	request, err := http.NewRequest("HEAD", path, nil)
 	if err != nil {
 		return nil, err
 	}
@@ -524,36 +564,68 @@ func grpcNetworkDial(ctx context.Context, addr string, cert *shared.CertInfo) (*
 	if err != nil {
 		return nil, err
 	}
+
+	// If the endpoint does not exists, it means that the target node is
+	// running version 1 of dqlite protocol. In that case we simply behave
+	// as the node was at an older LXD version.
+	if response.StatusCode == http.StatusNotFound {
+		return nil, db.ErrSomeNodesAreBehind
+	}
+
 	if response.StatusCode != http.StatusOK {
 		return nil, fmt.Errorf(response.Status)
 	}
 
-	options := []grpc.DialOption{
-		grpc.WithTransportCredentials(credentials.NewTLS(config)),
+	// Establish the connection
+	request = &http.Request{
+		Method:     "POST",
+		Proto:      "HTTP/1.1",
+		ProtoMajor: 1,
+		ProtoMinor: 1,
+		Header:     make(http.Header),
+		Host:       addr,
 	}
-	return grpc.DialContext(ctx, addr, options...)
-}
+	request.URL, err = url.Parse(path)
+	if err != nil {
+		return nil, err
+	}
+
+	request.Header.Set("Upgrade", "dqlite")
+	request = request.WithContext(ctx)
+
+	deadline, _ := ctx.Deadline()
+	dialer := &net.Dialer{Timeout: time.Until(deadline)}
+
+	conn, err := tls.DialWithDialer(dialer, "tcp", addr, config)
 
-// Convert a raw in-memory dial function into a gRPC one.
-func grpcMemoryDial(dial func() net.Conn) func() (*grpc.ClientConn, error) {
-	options := []grpc.DialOption{
-		grpc.WithInsecure(),
-		grpc.WithBlock(),
-		grpc.WithDialer(func(string, time.Duration) (net.Conn, error) {
-			return dial(), nil
-		}),
+	if err := request.Write(conn); err != nil {
+		return nil, errors.Wrap(err, "sending HTTP request failed")
 	}
-	return func() (*grpc.ClientConn, error) {
-		return grpc.Dial("", options...)
+
+	response, err = http.ReadResponse(bufio.NewReader(conn), request)
+	if err != nil {
+		return nil, errors.Wrap(err, "failed to read response")
 	}
+	if response.StatusCode != http.StatusSwitchingProtocols {
+		return nil, fmt.Errorf("dialing fail: expected status code 101 got %d", response.StatusCode)
+	}
+	if response.Header.Get("Upgrade") != "dqlite" {
+		return nil, fmt.Errorf("missing or unexpected Upgrade header in response")
+	}
+
+	return conn, err
 }
 
-// The LXD API endpoint path that gets routed to a gRPC server handler for
-// performing SQL queries against the dqlite driver running on this node.
-//
-// FIXME: figure out if there's a way to configure the gRPC client to add a
-//        prefix to this url, e.g. /internal/db/protocol.SQL/Conn.
-const grpcEndpoint = "/protocol.SQL/Conn"
+// Create a dial function that connects to the given listener.
+func dqliteMemoryDial(listener net.Listener) dqlite.DialFunc {
+	return func(ctx context.Context, address string) (net.Conn, error) {
+		return net.Dial("unix", listener.Addr().String())
+	}
+}
+
+// The LXD API endpoint path that gets routed to a dqlite server handler for
+// performing SQL queries against the dqlite server running on this node.
+const databaseEndpoint = "/internal/database"
 
 // Redirect dqlite's logs to our own logger
 func dqliteLog(configuredLevel string) func(level, message string) {
@@ -582,3 +654,41 @@ func dqliteLog(configuredLevel string) func(level, message string) {
 		}
 	}
 }
+
+func runDqliteProxy(listener net.Listener, acceptCh chan net.Conn) {
+	for {
+		src := <-acceptCh
+		dst, err := net.Dial("unix", listener.Addr().String())
+		if err != nil {
+			panic(err)
+		}
+		go func() {
+			io.Copy(eagain.Writer{Writer: dst}, eagain.Reader{Reader: src})
+			src.Close()
+		}()
+		go func() {
+			io.Copy(eagain.Writer{Writer: src}, eagain.Reader{Reader: dst})
+			dst.Close()
+		}()
+	}
+}
+
+// Conditionally uses the in-memory or the on-disk server store.
+type dqliteServerStore struct {
+	inMemory dqlite.ServerStore
+	onDisk   dqlite.ServerStore
+}
+
+func (s *dqliteServerStore) Get(ctx context.Context) ([]dqlite.ServerInfo, error) {
+	if s.inMemory != nil {
+		return s.inMemory.Get(ctx)
+	}
+	return s.onDisk.Get(ctx)
+}
+
+func (s *dqliteServerStore) Set(ctx context.Context, servers []dqlite.ServerInfo) error {
+	if s.inMemory != nil {
+		return s.inMemory.Set(ctx, servers)
+	}
+	return s.onDisk.Set(ctx, servers)
+}
diff --git a/lxd/cluster/gateway_test.go b/lxd/cluster/gateway_test.go
index 71ed46913..48fa86c4b 100644
--- a/lxd/cluster/gateway_test.go
+++ b/lxd/cluster/gateway_test.go
@@ -10,7 +10,7 @@ import (
 	"path/filepath"
 	"testing"
 
-	"github.com/CanonicalLtd/go-grpc-sql"
+	"github.com/CanonicalLtd/go-dqlite"
 	"github.com/hashicorp/raft"
 	"github.com/lxc/lxd/lxd/cluster"
 	"github.com/lxc/lxd/lxd/db"
@@ -18,6 +18,7 @@ import (
 	"github.com/lxc/lxd/shared/logging"
 	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/require"
+	"golang.org/x/net/context"
 )
 
 // Basic creation and shutdown. By default, the gateway runs an in-memory gRPC
@@ -44,8 +45,8 @@ func TestGateway_Single(t *testing.T) {
 		assert.Equal(t, 404, w.Code, endpoint)
 	}
 
-	dialer := gateway.Dialer()
-	conn, err := dialer()
+	dial := gateway.DialFunc()
+	conn, err := dial(context.Background(), "")
 	assert.NoError(t, err)
 	assert.NotNil(t, conn)
 
@@ -66,7 +67,7 @@ func TestGateway_SingleWithNetworkAddress(t *testing.T) {
 	defer server.Close()
 
 	address := server.Listener.Addr().String()
-	setRaftRole(t, db, address)
+	store := setRaftRole(t, db, address)
 
 	gateway := newGateway(t, db, cert)
 	defer gateway.Shutdown()
@@ -75,9 +76,12 @@ func TestGateway_SingleWithNetworkAddress(t *testing.T) {
 		mux.HandleFunc(path, handler)
 	}
 
-	driver := grpcsql.NewDriver(gateway.Dialer())
+	driver, err := dqlite.NewDriver(store, dqlite.WithDialFunc(gateway.DialFunc()))
+	require.NoError(t, err)
+
 	conn, err := driver.Open("test.db")
 	require.NoError(t, err)
+
 	require.NoError(t, conn.Close())
 
 	leader, err := gateway.LeaderAddress()
diff --git a/lxd/cluster/heartbeat.go b/lxd/cluster/heartbeat.go
index 67a27ff58..986505649 100644
--- a/lxd/cluster/heartbeat.go
+++ b/lxd/cluster/heartbeat.go
@@ -141,7 +141,7 @@ func heartbeatNode(taskCtx context.Context, address string, cert *shared.CertInf
 	if err != nil {
 		return err
 	}
-	url := fmt.Sprintf("https://%s%s", address, grpcEndpoint)
+	url := fmt.Sprintf("https://%s%s", address, databaseEndpoint)
 	client := &http.Client{Transport: &http.Transport{TLSClientConfig: config}}
 
 	buffer := bytes.Buffer{}
diff --git a/lxd/cluster/heartbeat_test.go b/lxd/cluster/heartbeat_test.go
index b1def2ed1..b6d698ad5 100644
--- a/lxd/cluster/heartbeat_test.go
+++ b/lxd/cluster/heartbeat_test.go
@@ -6,6 +6,7 @@ import (
 	"testing"
 	"time"
 
+	"github.com/CanonicalLtd/go-dqlite"
 	"github.com/hashicorp/raft"
 	"github.com/lxc/lxd/lxd/cluster"
 	"github.com/lxc/lxd/lxd/db"
@@ -249,7 +250,10 @@ func (f *heartbeatFixture) node() (*state.State, *cluster.Gateway, string) {
 
 	var err error
 	require.NoError(f.t, state.Cluster.Close())
-	state.Cluster, err = db.OpenCluster("db.bin", gateway.Dialer(), address, "/unused/db/dir")
+	store := gateway.ServerStore()
+	dial := gateway.DialFunc()
+	state.Cluster, err = db.OpenCluster(
+		"db.bin", store, address, "/unused/db/dir", dqlite.WithDialFunc(dial))
 	require.NoError(f.t, err)
 
 	f.gateways[len(f.gateways)] = gateway
diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go
index bbe80c92d..564a374c0 100644
--- a/lxd/cluster/membership.go
+++ b/lxd/cluster/membership.go
@@ -85,8 +85,8 @@ func Bootstrap(state *state.State, gateway *Gateway, name string) error {
 		return err
 	}
 
-	// Shutdown the gateway. This will trash any gRPC SQL connection
-	// against our in-memory dqlite driver and shutdown the associated raft
+	// Shutdown the gateway. This will trash any dqlite connection against
+	// our in-memory dqlite driver and shutdown the associated raft
 	// instance. We also lock regular access to the cluster database since
 	// we don't want any other database code to run while we're
 	// reconfiguring raft.
@@ -107,6 +107,7 @@ func Bootstrap(state *state.State, gateway *Gateway, name string) error {
 	if err != nil {
 		return errors.Wrap(err, "failed to re-initialize gRPC SQL gateway")
 	}
+
 	err = gateway.waitLeadership()
 	if err != nil {
 		return err
diff --git a/lxd/cluster/membership_test.go b/lxd/cluster/membership_test.go
index b1cbc7c86..97ac52db9 100644
--- a/lxd/cluster/membership_test.go
+++ b/lxd/cluster/membership_test.go
@@ -7,7 +7,7 @@ import (
 	"path/filepath"
 	"testing"
 
-	"github.com/CanonicalLtd/go-grpc-sql"
+	"github.com/CanonicalLtd/go-dqlite"
 	"github.com/lxc/lxd/lxd/cluster"
 	"github.com/lxc/lxd/lxd/db"
 	"github.com/lxc/lxd/lxd/state"
@@ -123,11 +123,6 @@ func TestBootstrap(t *testing.T) {
 		mux.HandleFunc(path, handler)
 	}
 
-	driver := grpcsql.NewDriver(gateway.Dialer())
-	conn, err := driver.Open("test.db")
-	require.NoError(t, err)
-	require.NoError(t, conn.Close())
-
 	count, err := cluster.Count(state)
 	require.NoError(t, err)
 	assert.Equal(t, 1, count)
@@ -254,15 +249,24 @@ func TestJoin(t *testing.T) {
 	}
 
 	targetAddress := targetServer.Listener.Addr().String()
-	var err error
+
 	require.NoError(t, targetState.Cluster.Close())
-	targetState.Cluster, err = db.OpenCluster("db.bin", targetGateway.Dialer(), targetAddress, "/unused/db/dir")
+
+	targetStore := targetGateway.ServerStore()
+	targetDialFunc := targetGateway.DialFunc()
+
+	var err error
+	targetState.Cluster, err = db.OpenCluster(
+		"db.bin", targetStore, targetAddress, "/unused/db/dir",
+		dqlite.WithDialFunc(targetDialFunc))
 	require.NoError(t, err)
+
 	targetF := &membershipFixtures{t: t, state: targetState}
 	targetF.NetworkAddress(targetAddress)
 
 	err = cluster.Bootstrap(targetState, targetGateway, "buzz")
 	require.NoError(t, err)
+	_, err = targetState.Cluster.Networks()
 
 	// Setup a joining node
 	mux := http.NewServeMux()
@@ -282,8 +286,14 @@ func TestJoin(t *testing.T) {
 	}
 
 	address := server.Listener.Addr().String()
+
 	require.NoError(t, state.Cluster.Close())
-	state.Cluster, err = db.OpenCluster("db.bin", gateway.Dialer(), address, "/unused/db/dir")
+
+	store := gateway.ServerStore()
+	dialFunc := gateway.DialFunc()
+
+	state.Cluster, err = db.OpenCluster(
+		"db.bin", store, address, "/unused/db/dir", dqlite.WithDialFunc(dialFunc))
 	require.NoError(t, err)
 
 	f := &membershipFixtures{t: t, state: state}
@@ -368,7 +378,10 @@ func FLAKY_TestPromote(t *testing.T) {
 	targetAddress := targetServer.Listener.Addr().String()
 	var err error
 	require.NoError(t, targetState.Cluster.Close())
-	targetState.Cluster, err = db.OpenCluster("db.bin", targetGateway.Dialer(), targetAddress, "/unused/db/dir")
+	store := targetGateway.ServerStore()
+	dialFunc := targetGateway.DialFunc()
+	targetState.Cluster, err = db.OpenCluster(
+		"db.bin", store, targetAddress, "/unused/db/dir", dqlite.WithDialFunc(dialFunc))
 	require.NoError(t, err)
 	targetF := &membershipFixtures{t: t, state: targetState}
 	targetF.NetworkAddress(targetAddress)
@@ -397,9 +410,6 @@ func FLAKY_TestPromote(t *testing.T) {
 		mux.HandleFunc(path, handler)
 	}
 
-	state.Cluster, err = db.OpenCluster("db.bin", gateway.Dialer(), address, "/unused/db/dir")
-	require.NoError(t, err)
-
 	// Promote the node.
 	targetF.RaftNode(address) // Add the address of the node to be promoted in the leader's db
 	raftNodes := targetF.RaftNodes()
diff --git a/lxd/cluster/raft.go b/lxd/cluster/raft.go
index 742fa26fd..ca9fc4b32 100644
--- a/lxd/cluster/raft.go
+++ b/lxd/cluster/raft.go
@@ -14,7 +14,7 @@ import (
 	"strings"
 	"time"
 
-	"github.com/CanonicalLtd/dqlite"
+	"github.com/CanonicalLtd/go-dqlite"
 	"github.com/CanonicalLtd/raft-http"
 	"github.com/CanonicalLtd/raft-membership"
 	"github.com/boltdb/bolt"
@@ -183,7 +183,8 @@ func raftInstanceInit(
 	}
 
 	// The dqlite registry and FSM.
-	registry := dqlite.NewRegistry(dir)
+	registry := dqlite.NewRegistry(strconv.Itoa(serial))
+	serial++
 	fsm := dqlite.NewFSM(registry)
 
 	// The actual raft instance.
@@ -214,6 +215,8 @@ func raftInstanceInit(
 	return instance, nil
 }
 
+var serial = 99
+
 // Registry returns the dqlite Registry associated with the raft instance.
 func (i *raftInstance) Registry() *dqlite.Registry {
 	return i.registry
@@ -280,7 +283,8 @@ func (i *raftInstance) Shutdown() error {
 	errCh := make(chan error)
 	timer := time.After(timeout)
 	go func() {
-		errCh <- i.raft.Snapshot().Error()
+		//errCh <- i.raft.Snapshot().Error()
+		errCh <- nil
 	}()
 	// In case of error we just log a warning, since this is not really
 	// fatal.
diff --git a/lxd/cluster/raft_test.go b/lxd/cluster/raft_test.go
index 9e6cfb983..223127da5 100644
--- a/lxd/cluster/raft_test.go
+++ b/lxd/cluster/raft_test.go
@@ -7,6 +7,7 @@ import (
 	"testing"
 	"time"
 
+	"github.com/CanonicalLtd/go-dqlite"
 	"github.com/CanonicalLtd/raft-test"
 	"github.com/hashicorp/raft"
 	"github.com/lxc/lxd/lxd/cluster"
@@ -122,7 +123,7 @@ func newRaft(t *testing.T, db *db.Node, cert *shared.CertInfo) *cluster.RaftInst
 // address into the raft_nodes table.
 //
 // This effectively makes the node act as a database raft node.
-func setRaftRole(t *testing.T, database *db.Node, address string) {
+func setRaftRole(t *testing.T, database *db.Node, address string) *dqlite.DatabaseServerStore {
 	require.NoError(t, database.Transaction(func(tx *db.NodeTx) error {
 		err := tx.UpdateConfig(map[string]string{"core.https_address": address})
 		if err != nil {
@@ -131,6 +132,9 @@ func setRaftRole(t *testing.T, database *db.Node, address string) {
 		_, err = tx.RaftNodeAdd(address)
 		return err
 	}))
+
+	store := dqlite.NewServerStore(database.DB(), "main", "raft_nodes", "address")
+	return store
 }
 
 // Create a new test HTTP server configured with the given TLS certificate and
diff --git a/lxd/cluster/upgrade.go b/lxd/cluster/upgrade.go
index 710aff7bd..ce6fcb4bd 100644
--- a/lxd/cluster/upgrade.go
+++ b/lxd/cluster/upgrade.go
@@ -25,7 +25,7 @@ func NotifyUpgradeCompleted(state *state.State, cert *shared.CertInfo) error {
 			return errors.Wrap(err, "failed to get connection info")
 		}
 
-		url := fmt.Sprintf("%s%s", info.Addresses[0], grpcEndpoint)
+		url := fmt.Sprintf("%s%s", info.Addresses[0], databaseEndpoint)
 		request, err := http.NewRequest("PATCH", url, nil)
 		if err != nil {
 			return errors.Wrap(err, "failed to create database notify upgrade request")

From 126b0cc2dfcec8b83dbaa36f62d755173c210860 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Fri, 13 Jul 2018 12:20:06 +0000
Subject: [PATCH 5/7] Adapt main package to new cluster sub-package API

Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
 Makefile                       |  2 +-
 lxd/api_cluster.go             |  8 ++++++-
 lxd/api_cluster_test.go        |  1 +
 lxd/daemon.go                  | 19 +++++++++++-----
 lxd/daemon_integration_test.go |  7 ++++--
 lxd/main_activateifneeded.go   |  4 ++--
 lxd/response.go                |  2 +-
 shared/logging/testing.go      |  2 +-
 test/includes/lxd.sh           | 51 ++++++++++++++++++++++--------------------
 test/suites/basic.sh           |  6 +++++
 test/suites/clustering.sh      |  4 ++--
 11 files changed, 67 insertions(+), 39 deletions(-)

diff --git a/Makefile b/Makefile
index 76bdb33d4..7efb277fb 100644
--- a/Makefile
+++ b/Makefile
@@ -9,7 +9,7 @@ POTFILE=po/$(DOMAIN).pot
 # TODO: use git describe for versioning
 VERSION=$(shell grep "var Version" shared/version/flex.go | cut -d'"' -f2)
 ARCHIVE=lxd-$(VERSION).tar
-TAGS=$(shell printf "\#include <sqlite3.h>\nvoid main(){int n = SQLITE_CONFIG_REPLICATION;}" | $(CC) -o /dev/null -xc - >/dev/null 2>&1 && echo "-tags libsqlite3")
+TAGS=$(shell printf "\#include <sqlite3.h>\nvoid main(){int n = SQLITE_IOERR_NOT_LEADER;}" | $(CC) -o /dev/null -xc - >/dev/null 2>&1 && echo "-tags libsqlite3")
 
 .PHONY: default
 default:
diff --git a/lxd/api_cluster.go b/lxd/api_cluster.go
index 89f2b5499..0bcfda56e 100644
--- a/lxd/api_cluster.go
+++ b/lxd/api_cluster.go
@@ -9,6 +9,7 @@ import (
 	"path/filepath"
 	"strconv"
 
+	"github.com/CanonicalLtd/go-dqlite"
 	"github.com/gorilla/mux"
 	lxd "github.com/lxc/lxd/client"
 	"github.com/lxc/lxd/lxd/cluster"
@@ -469,7 +470,12 @@ func clusterPutDisable(d *Daemon) Response {
 	if err != nil {
 		return SmartError(err)
 	}
-	d.cluster, err = db.OpenCluster("db.bin", d.gateway.Dialer(), address, "/unused/db/dir")
+	store := d.gateway.ServerStore()
+	d.cluster, err = db.OpenCluster(
+		"db.bin", store, address, "/unused/db/dir",
+		dqlite.WithDialFunc(d.gateway.DialFunc()),
+		dqlite.WithContext(d.gateway.Context()),
+	)
 	if err != nil {
 		return SmartError(err)
 	}
diff --git a/lxd/api_cluster_test.go b/lxd/api_cluster_test.go
index a8411e3dd..8e7bea1a6 100644
--- a/lxd/api_cluster_test.go
+++ b/lxd/api_cluster_test.go
@@ -109,6 +109,7 @@ func TestCluster_Join(t *testing.T) {
 
 	// Make the second node join the cluster.
 	f.RegisterCertificate(daemons[1], daemons[0], "rusp", "sekret")
+
 	address := daemons[0].endpoints.NetworkAddress()
 	cert := string(daemons[0].endpoints.NetworkPublicKey())
 	client = f.ClientUnix(daemons[1])
diff --git a/lxd/daemon.go b/lxd/daemon.go
index 31c7bd0d1..451cc28f8 100644
--- a/lxd/daemon.go
+++ b/lxd/daemon.go
@@ -17,6 +17,7 @@ import (
 	"time"
 
 	"github.com/CanonicalLtd/candidclient"
+	"github.com/CanonicalLtd/go-dqlite"
 	"github.com/gorilla/mux"
 	"github.com/pkg/errors"
 	"golang.org/x/net/context"
@@ -76,9 +77,10 @@ type externalAuth struct {
 
 // DaemonConfig holds configuration values for Daemon.
 type DaemonConfig struct {
-	Group       string   // Group name the local unix socket should be chown'ed to
-	Trace       []string // List of sub-systems to trace
-	RaftLatency float64  // Coarse grain measure of the cluster latency
+	Group              string        // Group name the local unix socket should be chown'ed to
+	Trace              []string      // List of sub-systems to trace
+	RaftLatency        float64       // Coarse grain measure of the cluster latency
+	DqliteSetupTimeout time.Duration // How long to wait for the cluster database to be up
 }
 
 // NewDaemon returns a new Daemon object with the given configuration.
@@ -95,7 +97,8 @@ func NewDaemon(config *DaemonConfig, os *sys.OS) *Daemon {
 // DefaultDaemonConfig returns a DaemonConfig object with default values/
 func DefaultDaemonConfig() *DaemonConfig {
 	return &DaemonConfig{
-		RaftLatency: 3.0,
+		RaftLatency:        3.0,
+		DqliteSetupTimeout: 36 * time.Hour, // Account for snap refresh lag
 	}
 }
 
@@ -469,7 +472,13 @@ func (d *Daemon) init() error {
 	for {
 		logger.Info("Initializing global database")
 		dir := filepath.Join(d.os.VarDir, "database")
-		d.cluster, err = db.OpenCluster("db.bin", d.gateway.Dialer(), address, dir)
+		store := d.gateway.ServerStore()
+		d.cluster, err = db.OpenCluster(
+			"db.bin", store, address, dir,
+			dqlite.WithDialFunc(d.gateway.DialFunc()),
+			dqlite.WithContext(d.gateway.Context()),
+			dqlite.WithConnectionTimeout(d.config.DqliteSetupTimeout),
+		)
 		if err == nil {
 			break
 		}
diff --git a/lxd/daemon_integration_test.go b/lxd/daemon_integration_test.go
index d1b113171..aad69f162 100644
--- a/lxd/daemon_integration_test.go
+++ b/lxd/daemon_integration_test.go
@@ -2,6 +2,7 @@ package main
 
 import (
 	"testing"
+	"time"
 
 	lxd "github.com/lxc/lxd/client"
 	"github.com/lxc/lxd/lxd/sys"
@@ -40,7 +41,7 @@ func newDaemon(t *testing.T) (*Daemon, func()) {
 	require.NoError(t, daemon.Init())
 
 	cleanup := func() {
-		require.NoError(t, daemon.Stop())
+		daemon.Stop()
 		osCleanup()
 		resetLogger()
 	}
@@ -76,6 +77,8 @@ func newDaemons(t *testing.T, n int) ([]*Daemon, func()) {
 // Create a new DaemonConfig object for testing purposes.
 func newConfig() *DaemonConfig {
 	return &DaemonConfig{
-		RaftLatency: 0.8,
+		RaftLatency:        0.8,
+		Trace:              []string{"dqlite"},
+		DqliteSetupTimeout: 10 * time.Second,
 	}
 }
diff --git a/lxd/main_activateifneeded.go b/lxd/main_activateifneeded.go
index 708eb57ab..b7a892741 100644
--- a/lxd/main_activateifneeded.go
+++ b/lxd/main_activateifneeded.go
@@ -5,7 +5,7 @@ import (
 	"fmt"
 	"os"
 
-	"github.com/CanonicalLtd/go-sqlite3"
+	"github.com/mattn/go-sqlite3"
 	"github.com/spf13/cobra"
 
 	"github.com/lxc/lxd/client"
@@ -71,7 +71,7 @@ func (c *cmdActivateifneeded) Run(cmd *cobra.Command, args []string) error {
 	}
 	d.db = db.ForLegacyPatches(sqldb)
 
-	/* Load the configured address the database */
+	// Load the configured address from the database
 	address, err := node.HTTPSAddress(d.db)
 	if err != nil {
 		return err
diff --git a/lxd/response.go b/lxd/response.go
index d01af2850..256341759 100644
--- a/lxd/response.go
+++ b/lxd/response.go
@@ -11,7 +11,7 @@ import (
 	"os"
 	"time"
 
-	"github.com/CanonicalLtd/go-sqlite3"
+	"github.com/mattn/go-sqlite3"
 
 	lxd "github.com/lxc/lxd/client"
 	"github.com/lxc/lxd/lxd/cluster"
diff --git a/shared/logging/testing.go b/shared/logging/testing.go
index 22c3a9a90..bad3375c9 100644
--- a/shared/logging/testing.go
+++ b/shared/logging/testing.go
@@ -34,6 +34,6 @@ func (h *testingHandler) Log(r *log.Record) error {
 	}
 
 	h.t.Logf("%s %s %s%s", r.Time.Format("15:04:05.000"), r.Lvl, r.Msg, ctx)
-	//fmt.Printf("%s %s %s%s\n", r.Time.Format("15:04:05.000"), r.Lvl, r.Msg, ctx)
+
 	return nil
 }
diff --git a/test/includes/lxd.sh b/test/includes/lxd.sh
index f424a4811..ee5485822 100644
--- a/test/includes/lxd.sh
+++ b/test/includes/lxd.sh
@@ -211,30 +211,33 @@ kill_lxd() {
         check_empty "${daemon_dir}/shmounts/"
         check_empty "${daemon_dir}/snapshots/"
 
-        echo "==> Checking for leftover cluster DB entries"
-        # FIXME: we should not use the command line sqlite client, since it's
-        #        not compatible with dqlite
-        check_empty_table "${daemon_dir}/database/global/db.bin" "containers"
-        check_empty_table "${daemon_dir}/database/global/db.bin" "containers_config"
-        check_empty_table "${daemon_dir}/database/global/db.bin" "containers_devices"
-        check_empty_table "${daemon_dir}/database/global/db.bin" "containers_devices_config"
-        check_empty_table "${daemon_dir}/database/global/db.bin" "containers_profiles"
-        check_empty_table "${daemon_dir}/database/global/db.bin" "images"
-        check_empty_table "${daemon_dir}/database/global/db.bin" "images_aliases"
-        check_empty_table "${daemon_dir}/database/global/db.bin" "images_properties"
-        check_empty_table "${daemon_dir}/database/global/db.bin" "images_source"
-        check_empty_table "${daemon_dir}/database/global/db.bin" "images_nodes"
-        check_empty_table "${daemon_dir}/database/global/db.bin" "networks"
-        check_empty_table "${daemon_dir}/database/global/db.bin" "networks_config"
-        check_empty_table "${daemon_dir}/database/global/db.bin" "profiles"
-        check_empty_table "${daemon_dir}/database/global/db.bin" "profiles_config"
-        check_empty_table "${daemon_dir}/database/global/db.bin" "profiles_devices"
-        check_empty_table "${daemon_dir}/database/global/db.bin" "profiles_devices_config"
-        check_empty_table "${daemon_dir}/database/global/db.bin" "storage_pools"
-        check_empty_table "${daemon_dir}/database/global/db.bin" "storage_pools_nodes"
-        check_empty_table "${daemon_dir}/database/global/db.bin" "storage_pools_config"
-        check_empty_table "${daemon_dir}/database/global/db.bin" "storage_volumes"
-        check_empty_table "${daemon_dir}/database/global/db.bin" "storage_volumes_config"
+        # Only check for leftover db entries when we're not clustered, since in
+        # that case the disk dump that we take at shutdown is not guaranteed to
+        # be fully up-to-date.
+        if [ ! -f "${daemon_dir}/cluster.crt" ]; then
+            echo "==> Checking for leftover DB entries"
+            check_empty_table "${daemon_dir}/database/global/db.bin" "containers"
+            check_empty_table "${daemon_dir}/database/global/db.bin" "containers_config"
+            check_empty_table "${daemon_dir}/database/global/db.bin" "containers_devices"
+            check_empty_table "${daemon_dir}/database/global/db.bin" "containers_devices_config"
+            check_empty_table "${daemon_dir}/database/global/db.bin" "containers_profiles"
+            check_empty_table "${daemon_dir}/database/global/db.bin" "images"
+            check_empty_table "${daemon_dir}/database/global/db.bin" "images_aliases"
+            check_empty_table "${daemon_dir}/database/global/db.bin" "images_properties"
+            check_empty_table "${daemon_dir}/database/global/db.bin" "images_source"
+            check_empty_table "${daemon_dir}/database/global/db.bin" "images_nodes"
+            check_empty_table "${daemon_dir}/database/global/db.bin" "networks"
+            check_empty_table "${daemon_dir}/database/global/db.bin" "networks_config"
+            check_empty_table "${daemon_dir}/database/global/db.bin" "profiles"
+            check_empty_table "${daemon_dir}/database/global/db.bin" "profiles_config"
+            check_empty_table "${daemon_dir}/database/global/db.bin" "profiles_devices"
+            check_empty_table "${daemon_dir}/database/global/db.bin" "profiles_devices_config"
+            check_empty_table "${daemon_dir}/database/global/db.bin" "storage_pools"
+            check_empty_table "${daemon_dir}/database/global/db.bin" "storage_pools_nodes"
+            check_empty_table "${daemon_dir}/database/global/db.bin" "storage_pools_config"
+            check_empty_table "${daemon_dir}/database/global/db.bin" "storage_volumes"
+            check_empty_table "${daemon_dir}/database/global/db.bin" "storage_volumes_config"
+        fi
     fi
 
     # teardown storage
diff --git a/test/suites/basic.sh b/test/suites/basic.sh
index 2fd97141a..dec56abbd 100644
--- a/test/suites/basic.sh
+++ b/test/suites/basic.sh
@@ -259,6 +259,12 @@ test_basic_usage() {
     lxc init testimage autostart --force-local
     lxd activateifneeded --debug 2>&1 | grep -q -v "activating..."
     lxc config set autostart boot.autostart true --force-local
+
+    # Restart the daemon, this forces the global database to be dumped to disk.
+    shutdown_lxd "${LXD_DIR}"
+    respawn_lxd "${LXD_DIR}" true
+    lxc stop --force autostart
+
     lxd activateifneeded --debug 2>&1 | grep -q "Daemon has auto-started containers, activating..."
 
     lxc config unset autostart boot.autostart --force-local
diff --git a/test/suites/clustering.sh b/test/suites/clustering.sh
index ab8debbb9..28bd38913 100644
--- a/test/suites/clustering.sh
+++ b/test/suites/clustering.sh
@@ -138,14 +138,14 @@ test_clustering_membership() {
   # Rename a node using the pre-existing name.
   LXD_DIR="${LXD_ONE_DIR}" lxc cluster rename node4 node3
 
-  # Trying to delete a container which is the only one with a copy of
+  # Trying to delete a node which is the only one with a copy of
   # an image results in an error
   LXD_DIR="${LXD_FOUR_DIR}" ensure_import_testimage
   ! LXD_DIR="${LXD_FOUR_DIR}" lxc cluster remove node3
   LXD_DIR="${LXD_TWO_DIR}" lxc image delete testimage
 
   # Remove a node gracefully.
-  LXD_DIR="${LXD_FOUR_DIR}" lxc cluster remove node3
+  LXD_DIR="${LXD_ONE_DIR}" lxc cluster remove node3
   ! LXD_DIR="${LXD_FOUR_DIR}" lxc cluster list
 
   LXD_DIR="${LXD_FIVE_DIR}" lxd shutdown

From 34db0d03a571c8d61fb88c9d65561cd8e80bc241 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 17 Jul 2018 19:53:51 +0000
Subject: [PATCH 6/7] Drop raft snapshot workaround

Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
 lxd/cluster/raft.go | 30 ++----------------------------
 1 file changed, 2 insertions(+), 28 deletions(-)

diff --git a/lxd/cluster/raft.go b/lxd/cluster/raft.go
index ca9fc4b32..bbb4074aa 100644
--- a/lxd/cluster/raft.go
+++ b/lxd/cluster/raft.go
@@ -275,30 +275,8 @@ func (i *raftInstance) Shutdown() error {
 	// Invoke raft APIs asynchronously to allow for a timeout.
 	timeout := 10 * time.Second
 
-	// FIXME/TODO: We take a snapshot before when shutting down the daemon
-	//             so there will be no uncompacted raft logs at the next
-	//             startup. This is a workaround for slow log replay when
-	//             the LXD daemon starts (see #4485). A more proper fix
-	//             should be probably implemented in dqlite.
 	errCh := make(chan error)
 	timer := time.After(timeout)
-	go func() {
-		//errCh <- i.raft.Snapshot().Error()
-		errCh <- nil
-	}()
-	// In case of error we just log a warning, since this is not really
-	// fatal.
-	select {
-	case err := <-errCh:
-		if err != nil && err != raft.ErrNothingNewToSnapshot {
-			logger.Warnf("Failed to take raft snapshot: %v", err)
-		}
-	case <-timer:
-		logger.Warnf("Timeout waiting for raft to take a snapshot")
-	}
-
-	errCh = make(chan error)
-	timer = time.After(timeout)
 	go func() {
 		errCh <- i.raft.Shutdown().Error()
 	}()
@@ -406,12 +384,8 @@ func raftConfig(latency float64) *raft.Config {
 		scale(duration)
 	}
 
-	// FIXME/TODO: We increase the frequency of snapshots here to keep the
-	//             number of uncompacted raft logs low, and workaround slow
-	//             log replay when the LXD daemon starts (see #4485). A more
-	//             proper fix should be probably implemented in dqlite.
-	config.SnapshotThreshold = 512
-	config.TrailingLogs = 128
+	config.SnapshotThreshold = 1024
+	config.TrailingLogs = 512
 
 	return config
 }

From 8107e292f2caf4ee1c1c6b3b646e358c520ccc72 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 17 Jul 2018 19:58:13 +0000
Subject: [PATCH 7/7] Fetch containers info in parallel

Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
 lxd/containers_get.go | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/lxd/containers_get.go b/lxd/containers_get.go
index a54c8964d..3a4fe94e4 100644
--- a/lxd/containers_get.go
+++ b/lxd/containers_get.go
@@ -80,6 +80,7 @@ func doContainersGet(d *Daemon, r *http.Request) (interface{}, error) {
 		resultMu.Unlock()
 	}
 
+	wg := sync.WaitGroup{}
 	for address, containers := range result {
 		// If this is an internal request from another cluster node,
 		// ignore containers from other nodes, and return only the ones
@@ -100,7 +101,9 @@ func doContainersGet(d *Daemon, r *http.Request) (interface{}, error) {
 		// For recursion requests we need to fetch the state of remote
 		// containers from their respective nodes.
 		if recursion && address != "" && !isClusterNotification(r) {
-			func(address string, containers []string) {
+			wg.Add(1)
+			go func(address string, containers []string) {
+				defer wg.Done()
 				cert := d.endpoints.NetworkCert()
 
 				cs, err := doContainersGetFromNode(address, cert)
@@ -135,6 +138,7 @@ func doContainersGet(d *Daemon, r *http.Request) (interface{}, error) {
 			}
 		}
 	}
+	wg.Wait()
 
 	if !recursion {
 		return resultString, nil


More information about the lxc-devel mailing list