[lxc-devel] [lxd/master] Wire new dqlite implementation
freeekanayaka on Github
lxc-bot at linuxcontainers.org
Mon Jul 30 20:49:33 UTC 2018
A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 470 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20180730/228f3d31/attachment.bin>
-------------- next part --------------
From ca98f334ecaf4104609a1f5980ca92207857d05e Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Thu, 12 Jul 2018 08:33:26 +0000
Subject: [PATCH 1/7] Use mattn's sqlite3 bindings in the lxd/db sub package
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/db/query/retry.go | 2 +-
lxd/db/schema/update.go | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
diff --git a/lxd/db/query/retry.go b/lxd/db/query/retry.go
index 65ef4636d..0c78a8c7f 100644
--- a/lxd/db/query/retry.go
+++ b/lxd/db/query/retry.go
@@ -4,8 +4,8 @@ import (
"strings"
"time"
- "github.com/CanonicalLtd/go-sqlite3"
"github.com/lxc/lxd/shared/logger"
+ "github.com/mattn/go-sqlite3"
"github.com/pkg/errors"
)
diff --git a/lxd/db/schema/update.go b/lxd/db/schema/update.go
index 121cd1c73..3a9da889d 100644
--- a/lxd/db/schema/update.go
+++ b/lxd/db/schema/update.go
@@ -7,7 +7,7 @@ import (
"path"
"runtime"
- _ "github.com/CanonicalLtd/go-sqlite3" // For opening the in-memory database
+ _ "github.com/mattn/go-sqlite3" // For opening the in-memory database
)
// DotGo writes '<name>.go' source file in the package of the calling function, containing
From e0b63cbeb48f15cb1b4c9619e74a6ba8e2698b14 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Thu, 12 Jul 2018 09:06:41 +0000
Subject: [PATCH 2/7] Drop go-1.6 code
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/db/query/slices.go | 19 +++++++++++++++++++
lxd/db/query/slices_1_6.go | 15 ---------------
lxd/db/query/slices_1_6_test.go | 18 ------------------
lxd/db/query/slices_1_8.go | 26 --------------------------
lxd/db/query/slices_1_8_test.go | 21 ---------------------
lxd/db/query/slices_test.go | 18 ++++++++++++++++++
6 files changed, 37 insertions(+), 80 deletions(-)
delete mode 100644 lxd/db/query/slices_1_6.go
delete mode 100644 lxd/db/query/slices_1_6_test.go
delete mode 100644 lxd/db/query/slices_1_8.go
delete mode 100644 lxd/db/query/slices_1_8_test.go
diff --git a/lxd/db/query/slices.go b/lxd/db/query/slices.go
index 6cd9a7934..c86deca42 100644
--- a/lxd/db/query/slices.go
+++ b/lxd/db/query/slices.go
@@ -104,3 +104,22 @@ func scanSingleColumn(tx *sql.Tx, query string, args []interface{}, typeName str
// Function to scan a single row.
type scanFunc func(*sql.Rows) error
+
+// Check that the given result set yields rows with a single column of a
+// specific type.
+func checkRowsHaveOneColumnOfSpecificType(rows *sql.Rows, typeName string) error {
+ types, err := rows.ColumnTypes()
+ if err != nil {
+ return err
+ }
+ if len(types) != 1 {
+ return fmt.Errorf("query yields %d columns, not 1", len(types))
+ }
+
+ actualTypeName := strings.ToUpper(types[0].DatabaseTypeName())
+ if actualTypeName != typeName {
+ return fmt.Errorf("query yields %s column, not %s", actualTypeName, typeName)
+ }
+
+ return nil
+}
diff --git a/lxd/db/query/slices_1_6.go b/lxd/db/query/slices_1_6.go
deleted file mode 100644
index 652147b70..000000000
--- a/lxd/db/query/slices_1_6.go
+++ /dev/null
@@ -1,15 +0,0 @@
-// +build !go1.8
-
-package query
-
-import "database/sql"
-
-// Check that the given result set yields rows with a single column of a
-// specific type.
-func checkRowsHaveOneColumnOfSpecificType(rows *sql.Rows, typeName string) error {
- // The Rows.ColumnTypes() method is available only since Go 1.8, so we
- // just return nil for <1.8. This is safe to do since if the returned
- // rows are not of the expected type, call sites will still fail at
- // Rows.Scan() time, although the error message will be less clear.
- return nil
-}
diff --git a/lxd/db/query/slices_1_6_test.go b/lxd/db/query/slices_1_6_test.go
deleted file mode 100644
index d508edecf..000000000
--- a/lxd/db/query/slices_1_6_test.go
+++ /dev/null
@@ -1,18 +0,0 @@
-// +build !go1.8
-
-package query_test
-
-var testStringsErrorCases = []struct {
- query string
- error string
-}{
- {"garbage", "near \"garbage\": syntax error"},
- {"SELECT id, name FROM test", "sql: expected 2 destination arguments in Scan, not 1"},
-}
-
-var testIntegersErrorCases = []struct {
- query string
- error string
-}{
- {"garbage", "near \"garbage\": syntax error"},
-}
diff --git a/lxd/db/query/slices_1_8.go b/lxd/db/query/slices_1_8.go
deleted file mode 100644
index 140de7c2a..000000000
--- a/lxd/db/query/slices_1_8.go
+++ /dev/null
@@ -1,26 +0,0 @@
-// +build go1.8
-
-package query
-
-import (
- "database/sql"
- "fmt"
- "strings"
-)
-
-// Check that the given result set yields rows with a single column of a
-// specific type.
-func checkRowsHaveOneColumnOfSpecificType(rows *sql.Rows, typeName string) error {
- types, err := rows.ColumnTypes()
- if err != nil {
- return err
- }
- if len(types) != 1 {
- return fmt.Errorf("query yields %d columns, not 1", len(types))
- }
- actualTypeName := strings.ToUpper(types[0].DatabaseTypeName())
- if actualTypeName != typeName {
- return fmt.Errorf("query yields %s column, not %s", actualTypeName, typeName)
- }
- return nil
-}
diff --git a/lxd/db/query/slices_1_8_test.go b/lxd/db/query/slices_1_8_test.go
deleted file mode 100644
index dae33c2ab..000000000
--- a/lxd/db/query/slices_1_8_test.go
+++ /dev/null
@@ -1,21 +0,0 @@
-// +build go1.8
-
-package query_test
-
-var testStringsErrorCases = []struct {
- query string
- error string
-}{
- {"garbage", "near \"garbage\": syntax error"},
- {"SELECT id, name FROM test", "query yields 2 columns, not 1"},
- {"SELECT id FROM test", "query yields INTEGER column, not TEXT"},
-}
-
-var testIntegersErrorCases = []struct {
- query string
- error string
-}{
- {"garbage", "near \"garbage\": syntax error"},
- {"SELECT id, name FROM test", "query yields 2 columns, not 1"},
- {"SELECT name FROM test", "query yields TEXT column, not INTEGER"},
-}
diff --git a/lxd/db/query/slices_test.go b/lxd/db/query/slices_test.go
index 2b4a1cce2..3e028528b 100644
--- a/lxd/db/query/slices_test.go
+++ b/lxd/db/query/slices_test.go
@@ -23,6 +23,15 @@ func TestStrings_Error(t *testing.T) {
}
}
+var testStringsErrorCases = []struct {
+ query string
+ error string
+}{
+ {"garbage", "near \"garbage\": syntax error"},
+ {"SELECT id, name FROM test", "query yields 2 columns, not 1"},
+ {"SELECT id FROM test", "query yields INTEGER column, not TEXT"},
+}
+
// All values yield by the query are returned.
func TestStrings(t *testing.T) {
tx := newTxForSlices(t)
@@ -43,6 +52,15 @@ func TestIntegers_Error(t *testing.T) {
}
}
+var testIntegersErrorCases = []struct {
+ query string
+ error string
+}{
+ {"garbage", "near \"garbage\": syntax error"},
+ {"SELECT id, name FROM test", "query yields 2 columns, not 1"},
+ {"SELECT name FROM test", "query yields TEXT column, not INTEGER"},
+}
+
// All values yield by the query are returned.
func TestIntegers(t *testing.T) {
tx := newTxForSlices(t)
From 5e7425ff80ed064b6783c6b8c8aa4cb52be5db95 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Thu, 12 Jul 2018 12:10:59 +0000
Subject: [PATCH 3/7] Replace grpc-sql with dqlite custom protocol
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/db/cluster/open.go | 28 ++++++++------
lxd/db/db.go | 9 +++--
lxd/db/migration.go | 2 +-
lxd/db/node/sqlite.go | 2 +-
lxd/db/query/slices.go | 24 ------------
lxd/db/query/slices_test.go | 6 +--
lxd/db/schema/schema.go | 3 ++
lxd/db/testing.go | 90 +++++++++++++++++++++++++++++++--------------
8 files changed, 92 insertions(+), 72 deletions(-)
diff --git a/lxd/db/cluster/open.go b/lxd/db/cluster/open.go
index 3cb7c5c3e..20e456e20 100644
--- a/lxd/db/cluster/open.go
+++ b/lxd/db/cluster/open.go
@@ -6,7 +6,7 @@ import (
"path/filepath"
"sync/atomic"
- "github.com/CanonicalLtd/go-grpc-sql"
+ "github.com/CanonicalLtd/go-dqlite"
"github.com/lxc/lxd/lxd/db/query"
"github.com/lxc/lxd/lxd/db/schema"
"github.com/lxc/lxd/lxd/util"
@@ -23,18 +23,22 @@ import (
//
// The dialer argument is a function that returns a gRPC dialer that can be
// used to connect to a database node using the gRPC SQL package.
-func Open(name string, dialer grpcsql.Dialer) (*sql.DB, error) {
- driver := grpcsql.NewDriver(dialer)
- driverName := grpcSQLDriverName()
+func Open(name string, store dqlite.ServerStore, options ...dqlite.DriverOption) (*sql.DB, error) {
+ driver, err := dqlite.NewDriver(store, options...)
+ if err != nil {
+ return nil, errors.Wrap(err, "failed to create dqlite driver")
+ }
+
+ driverName := dqliteDriverName()
sql.Register(driverName, driver)
- // Create the cluster db. This won't immediately establish any gRPC
+ // Create the cluster db. This won't immediately establish any network
// connection, that will happen only when a db transaction is started
// (see the database/sql connection pooling code for more details).
if name == "" {
name = "db.bin"
}
- db, err := sql.Open(driverName, name+"?_foreign_keys=1")
+ db, err := sql.Open(driverName, name)
if err != nil {
return nil, fmt.Errorf("cannot open cluster database: %v", err)
}
@@ -191,18 +195,18 @@ INSERT INTO profiles (name, description) VALUES ('default', 'Default LXD profile
return true, err
}
-// Generate a new name for the grpcsql driver registration. We need it to be
+// Generate a new name for the dqlite driver registration. We need it to be
// unique for testing, see below.
-func grpcSQLDriverName() string {
- defer atomic.AddUint64(&grpcSQLDriverSerial, 1)
- return fmt.Sprintf("grpc-%d", grpcSQLDriverSerial)
+func dqliteDriverName() string {
+ defer atomic.AddUint64(&dqliteDriverSerial, 1)
+ return fmt.Sprintf("dqlite-%d", dqliteDriverSerial)
}
-// Monotonic serial number for registering new instances of grpcsql.Driver
+// Monotonic serial number for registering new instances of dqlite.Driver
// using the database/sql stdlib package. This is needed since there's no way
// to unregister drivers, and in unit tests more than one driver gets
// registered.
-var grpcSQLDriverSerial uint64
+var dqliteDriverSerial uint64
func checkClusterIsUpgradable(tx *sql.Tx, target [2]int) error {
// Get the current versions in the nodes table.
diff --git a/lxd/db/db.go b/lxd/db/db.go
index c85ec9c55..c439ec19a 100644
--- a/lxd/db/db.go
+++ b/lxd/db/db.go
@@ -6,7 +6,7 @@ import (
"sync"
"time"
- "github.com/CanonicalLtd/go-grpc-sql"
+ "github.com/CanonicalLtd/go-dqlite"
"github.com/pkg/errors"
"golang.org/x/net/context"
@@ -59,6 +59,9 @@ func OpenNode(dir string, fresh func(*Node) error, legacyPatches map[int]*Legacy
return nil, nil, err
}
+ db.SetMaxOpenConns(1)
+ db.SetMaxIdleConns(1)
+
legacyHook := legacyPatchHook(db, legacyPatches)
hook := func(version int, tx *sql.Tx) error {
if version == node.UpdateFromPreClustering {
@@ -155,8 +158,8 @@ type Cluster struct {
// database matches our version, and possibly trigger a schema update. If the
// schema update can't be performed right now, because some nodes are still
// behind, an Upgrading error is returned.
-func OpenCluster(name string, dialer grpcsql.Dialer, address, dir string) (*Cluster, error) {
- db, err := cluster.Open(name, dialer)
+func OpenCluster(name string, store dqlite.ServerStore, address, dir string, options ...dqlite.DriverOption) (*Cluster, error) {
+ db, err := cluster.Open(name, store, options...)
if err != nil {
return nil, errors.Wrap(err, "failed to open database")
}
diff --git a/lxd/db/migration.go b/lxd/db/migration.go
index 1f06db1bd..ee769f53c 100644
--- a/lxd/db/migration.go
+++ b/lxd/db/migration.go
@@ -230,7 +230,7 @@ func importNodeAssociation(entity string, columns []string, row []interface{}, t
if id == 0 {
return fmt.Errorf("entity %s has invalid ID", entity)
}
- _, err := tx.Exec(stmt, row...)
+ _, err := tx.Exec(stmt, id)
if err != nil {
return errors.Wrapf(err, "failed to associate %s to node", entity)
}
diff --git a/lxd/db/node/sqlite.go b/lxd/db/node/sqlite.go
index 235b8a776..eecef80ab 100644
--- a/lxd/db/node/sqlite.go
+++ b/lxd/db/node/sqlite.go
@@ -4,7 +4,7 @@ import (
"database/sql"
"fmt"
- "github.com/CanonicalLtd/go-sqlite3"
+ "github.com/mattn/go-sqlite3"
)
func init() {
diff --git a/lxd/db/query/slices.go b/lxd/db/query/slices.go
index c86deca42..b62335203 100644
--- a/lxd/db/query/slices.go
+++ b/lxd/db/query/slices.go
@@ -83,11 +83,6 @@ func scanSingleColumn(tx *sql.Tx, query string, args []interface{}, typeName str
}
defer rows.Close()
- err = checkRowsHaveOneColumnOfSpecificType(rows, typeName)
- if err != nil {
- return err
- }
-
for rows.Next() {
err := scan(rows)
if err != nil {
@@ -104,22 +99,3 @@ func scanSingleColumn(tx *sql.Tx, query string, args []interface{}, typeName str
// Function to scan a single row.
type scanFunc func(*sql.Rows) error
-
-// Check that the given result set yields rows with a single column of a
-// specific type.
-func checkRowsHaveOneColumnOfSpecificType(rows *sql.Rows, typeName string) error {
- types, err := rows.ColumnTypes()
- if err != nil {
- return err
- }
- if len(types) != 1 {
- return fmt.Errorf("query yields %d columns, not 1", len(types))
- }
-
- actualTypeName := strings.ToUpper(types[0].DatabaseTypeName())
- if actualTypeName != typeName {
- return fmt.Errorf("query yields %s column, not %s", actualTypeName, typeName)
- }
-
- return nil
-}
diff --git a/lxd/db/query/slices_test.go b/lxd/db/query/slices_test.go
index 3e028528b..6a37d8ba3 100644
--- a/lxd/db/query/slices_test.go
+++ b/lxd/db/query/slices_test.go
@@ -28,8 +28,7 @@ var testStringsErrorCases = []struct {
error string
}{
{"garbage", "near \"garbage\": syntax error"},
- {"SELECT id, name FROM test", "query yields 2 columns, not 1"},
- {"SELECT id FROM test", "query yields INTEGER column, not TEXT"},
+ {"SELECT id, name FROM test", "sql: expected 2 destination arguments in Scan, not 1"},
}
// All values yield by the query are returned.
@@ -57,8 +56,7 @@ var testIntegersErrorCases = []struct {
error string
}{
{"garbage", "near \"garbage\": syntax error"},
- {"SELECT id, name FROM test", "query yields 2 columns, not 1"},
- {"SELECT name FROM test", "query yields TEXT column, not INTEGER"},
+ {"SELECT id, name FROM test", "sql: expected 2 destination arguments in Scan, not 1"},
}
// All values yield by the query are returned.
diff --git a/lxd/db/schema/schema.go b/lxd/db/schema/schema.go
index 040ab6d7a..e576c7887 100644
--- a/lxd/db/schema/schema.go
+++ b/lxd/db/schema/schema.go
@@ -148,6 +148,7 @@ func (s *Schema) Ensure(db *sql.DB) (int, error) {
if err != nil {
return err
}
+
current, err = queryCurrentVersion(tx)
if err != nil {
return err
@@ -165,6 +166,7 @@ func (s *Schema) Ensure(db *sql.DB) (int, error) {
return err
}
}
+
// When creating the schema from scratch, use the fresh dump if
// available. Otherwise just apply all relevant updates.
if current == 0 && s.fresh != "" {
@@ -315,6 +317,7 @@ func queryCurrentVersion(tx *sql.Tx) (int, error) {
}
current = versions[len(versions)-1] // Highest recorded version
}
+
return current, nil
}
diff --git a/lxd/db/testing.go b/lxd/db/testing.go
index e2d77cb4d..18a59e303 100644
--- a/lxd/db/testing.go
+++ b/lxd/db/testing.go
@@ -1,17 +1,17 @@
package db
import (
+ "context"
+ "fmt"
"io/ioutil"
"net"
"os"
"testing"
- "time"
- "github.com/CanonicalLtd/go-grpc-sql"
- "github.com/CanonicalLtd/go-sqlite3"
- "github.com/lxc/lxd/lxd/util"
+ "github.com/CanonicalLtd/go-dqlite"
+ "github.com/CanonicalLtd/raft-test"
+ "github.com/hashicorp/raft"
"github.com/stretchr/testify/require"
- "google.golang.org/grpc"
)
// NewTestNode creates a new Node for testing purposes, along with a function
@@ -53,15 +53,23 @@ func NewTestNodeTx(t *testing.T) (*NodeTx, func()) {
// NewTestCluster creates a new Cluster for testing purposes, along with a function
// that can be used to clean it up when done.
func NewTestCluster(t *testing.T) (*Cluster, func()) {
- // Create an in-memory gRPC SQL server and dialer.
- server, dialer := newGrpcServer()
+ // Create an in-memory dqlite SQL server and associated store.
+ store, serverCleanup := newDqliteServer(t)
- cluster, err := OpenCluster(":memory:", dialer, "1", "/unused/db/dir")
+ log := newLogFunc(t)
+
+ dial := func(ctx context.Context, address string) (net.Conn, error) {
+ return net.Dial("unix", address)
+ }
+
+ cluster, err := OpenCluster(
+ "test.db", store, "1", "/unused/db/dir",
+ dqlite.WithLogFunc(log), dqlite.WithDialFunc(dial))
require.NoError(t, err)
cleanup := func() {
require.NoError(t, cluster.Close())
- server.Stop()
+ serverCleanup()
}
return cluster, cleanup
@@ -87,26 +95,54 @@ func NewTestClusterTx(t *testing.T) (*ClusterTx, func()) {
return clusterTx, cleanup
}
-// Create a new in-memory gRPC server attached to a grpc-sql gateway backed by a
-// SQLite driver.
+// Create a new in-memory dqlite server.
//
-// Return the newly created gRPC server and a dialer that can be used to
-// connect to it.
-func newGrpcServer() (*grpc.Server, grpcsql.Dialer) {
- listener, dial := util.InMemoryNetwork()
- server := grpcsql.NewServer(&sqlite3.SQLiteDriver{})
-
- // Setup an in-memory gRPC dialer.
- options := []grpc.DialOption{
- grpc.WithInsecure(),
- grpc.WithDialer(func(string, time.Duration) (net.Conn, error) {
- return dial(), nil
- }),
+// Return the newly created server store can be used to connect to it.
+func newDqliteServer(t *testing.T) (*dqlite.DatabaseServerStore, func()) {
+ t.Helper()
+
+ listener, err := net.Listen("unix", "")
+ require.NoError(t, err)
+
+ address := listener.Addr().String()
+
+ store, err := dqlite.DefaultServerStore(":memory:")
+ require.NoError(t, err)
+ require.NoError(t, store.Set(context.Background(), []dqlite.ServerInfo{{Address: address}}))
+
+ id := fmt.Sprintf("%d", dqliteSerial)
+ dqliteSerial++
+ registry := dqlite.NewRegistry(id)
+
+ fsm := dqlite.NewFSM(registry)
+
+ r, raftCleanup := rafttest.Server(t, fsm, rafttest.Transport(func(i int) raft.Transport {
+ require.Equal(t, i, 0)
+ address := raft.ServerAddress(listener.Addr().String())
+ _, transport := raft.NewInmemTransport(address)
+ return transport
+ }))
+
+ log := newLogFunc(t)
+
+ server, err := dqlite.NewServer(
+ r, registry, listener, dqlite.WithServerLogFunc(log))
+ require.NoError(t, err)
+
+ cleanup := func() {
+ require.NoError(t, server.Close())
+ raftCleanup()
}
- dialer := func() (*grpc.ClientConn, error) {
- return grpc.Dial("", options...)
+
+ return store, cleanup
+}
+
+var dqliteSerial = 0
+
+func newLogFunc(t *testing.T) dqlite.LogFunc {
+ return func(l dqlite.LogLevel, format string, a ...interface{}) {
+ format = fmt.Sprintf("%s: %s", l.String(), format)
+ t.Logf(format, a...)
}
- go server.Serve(listener)
- return server, dialer
}
From cd246474eca2f10dc8fd81619a133a34755eed68 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Fri, 13 Jul 2018 12:19:34 +0000
Subject: [PATCH 4/7] Wire dqlite server
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/cluster/gateway.go | 292 ++++++++++++++++++++++++++++-------------
lxd/cluster/gateway_test.go | 14 +-
lxd/cluster/heartbeat.go | 2 +-
lxd/cluster/heartbeat_test.go | 6 +-
lxd/cluster/membership.go | 5 +-
lxd/cluster/membership_test.go | 36 +++--
lxd/cluster/raft.go | 10 +-
lxd/cluster/raft_test.go | 6 +-
lxd/cluster/upgrade.go | 2 +-
9 files changed, 255 insertions(+), 118 deletions(-)
diff --git a/lxd/cluster/gateway.go b/lxd/cluster/gateway.go
index b37b6db1f..34b9b4f0c 100644
--- a/lxd/cluster/gateway.go
+++ b/lxd/cluster/gateway.go
@@ -1,7 +1,10 @@
package cluster
import (
+ "bufio"
+ "crypto/tls"
"fmt"
+ "io"
"net"
"net/http"
"net/url"
@@ -10,17 +13,15 @@ import (
"strconv"
"time"
- "github.com/CanonicalLtd/dqlite"
- "github.com/CanonicalLtd/go-grpc-sql"
+ "github.com/CanonicalLtd/go-dqlite"
"github.com/hashicorp/raft"
"github.com/lxc/lxd/lxd/db"
"github.com/lxc/lxd/lxd/util"
"github.com/lxc/lxd/shared"
+ "github.com/lxc/lxd/shared/eagain"
"github.com/lxc/lxd/shared/logger"
"github.com/pkg/errors"
"golang.org/x/net/context"
- "google.golang.org/grpc"
- "google.golang.org/grpc/credentials"
)
// NewGateway creates a new Gateway for managing access to the dqlite cluster.
@@ -49,6 +50,8 @@ func NewGateway(db *db.Node, cert *shared.CertInfo, options ...Option) (*Gateway
ctx: ctx,
cancel: cancel,
upgradeCh: make(chan struct{}, 16),
+ acceptCh: make(chan net.Conn),
+ store: &dqliteServerStore{},
}
err := gateway.init()
@@ -74,15 +77,16 @@ type Gateway struct {
// The gRPC server exposing the dqlite driver created by this
// gateway. It's nil if this LXD node is not supposed to be part of the
// raft cluster.
- server *grpc.Server
+ server *dqlite.Server
+ acceptCh chan net.Conn
- // A dialer that will connect to the gRPC server using an in-memory
+ // A dialer that will connect to the dqlite server using a loopback
// net.Conn. It's non-nil when clustering is not enabled on this LXD
// node, and so we don't expose any dqlite or raft network endpoint,
// but still we want to use dqlite as backend for the "cluster"
// database, to minimize the difference between code paths in
// clustering and non-clustering modes.
- memoryDial func() (*grpc.ClientConn, error)
+ memoryDial dqlite.DialFunc
// Used when shutting down the daemon to cancel any ongoing gRPC
// dialing attempt.
@@ -92,6 +96,9 @@ type Gateway struct {
// Used to unblock nodes that are waiting for other nodes to upgrade
// their version.
upgradeCh chan struct{}
+
+ // ServerStore wrapper.
+ store *dqliteServerStore
}
// HandlerFuncs returns the HTTP handlers that should be added to the REST API
@@ -104,7 +111,7 @@ type Gateway struct {
// non-clustered node not available over the network or because it is not a
// database node part of the dqlite cluster.
func (g *Gateway) HandlerFuncs() map[string]http.HandlerFunc {
- grpc := func(w http.ResponseWriter, r *http.Request) {
+ database := func(w http.ResponseWriter, r *http.Request) {
if !tlsCheckCert(r, g.cert) {
http.Error(w, "403 invalid client certificate", http.StatusForbidden)
return
@@ -166,7 +173,33 @@ func (g *Gateway) HandlerFuncs() map[string]http.HandlerFunc {
return
}
- g.server.ServeHTTP(w, r)
+ if r.Header.Get("Upgrade") != "dqlite" {
+ http.Error(w, "missing or invalid upgrade header", http.StatusBadRequest)
+ return
+ }
+
+ hijacker, ok := w.(http.Hijacker)
+ if !ok {
+ http.Error(w, "webserver doesn't support hijacking", http.StatusInternalServerError)
+ return
+ }
+
+ conn, _, err := hijacker.Hijack()
+ if err != nil {
+ message := errors.Wrap(err, "failed to hijack connection").Error()
+ http.Error(w, message, http.StatusInternalServerError)
+ return
+ }
+
+ // Write the status line and upgrade header by hand since w.WriteHeader()
+ // would fail after Hijack()
+ data := []byte("HTTP/1.1 101 Switching Protocols\r\nUpgrade: dqlite\r\n\r\n")
+ if n, err := conn.Write(data); err != nil || n != len(data) {
+ conn.Close()
+ return
+ }
+
+ g.acceptCh <- conn
}
raft := func(w http.ResponseWriter, r *http.Request) {
// If we are not part of the raft cluster, reply with a
@@ -205,8 +238,8 @@ func (g *Gateway) HandlerFuncs() map[string]http.HandlerFunc {
}
return map[string]http.HandlerFunc{
- grpcEndpoint: grpc,
- raftEndpoint: raft,
+ databaseEndpoint: database,
+ raftEndpoint: raft,
}
}
@@ -222,47 +255,34 @@ func (g *Gateway) IsDatabaseNode() bool {
return g.raft != nil
}
-// Dialer returns a gRPC dial function that can be used to connect to one of
-// the dqlite nodes via gRPC.
-func (g *Gateway) Dialer() grpcsql.Dialer {
- return func() (*grpc.ClientConn, error) {
+// DialFunc returns a dial function that can be used to connect to one of the
+// dqlite nodes.
+func (g *Gateway) DialFunc() dqlite.DialFunc {
+ return func(ctx context.Context, address string) (net.Conn, error) {
// Memory connection.
if g.memoryDial != nil {
- return g.memoryDial()
+ return g.memoryDial(ctx, address)
}
- // TODO: should the timeout be configurable?
- ctx, cancel := context.WithTimeout(g.ctx, 10*time.Second)
- defer cancel()
- var err error
- for {
- // Network connection.
- addresses, dbErr := g.cachedRaftNodes()
- if dbErr != nil {
- return nil, dbErr
- }
-
- for _, address := range addresses {
- var conn *grpc.ClientConn
- conn, err = grpcNetworkDial(g.ctx, address, g.cert)
- if err == nil {
- return conn, nil
- }
- logger.Debugf("Failed to establish gRPC connection with %s: %v", address, err)
- }
- if ctx.Err() != nil {
- return nil, ctx.Err()
- }
- select {
- case <-time.After(250 * time.Millisecond):
- continue
- case <-ctx.Done():
- return nil, ctx.Err()
- }
- }
+ return dqliteNetworkDial(ctx, address, g.cert)
}
}
+// Context returns a cancellation context to pass to dqlite.NewDriver as
+// option.
+//
+// This context gets cancelled by Gateway.Kill() and at that point any
+// connection failure won't be retried.
+func (g *Gateway) Context() context.Context {
+ return g.ctx
+}
+
+// ServerStore returns a dqlite server store that can be used to lookup the
+// addresses of known database nodes.
+func (g *Gateway) ServerStore() dqlite.ServerStore {
+ return g.store
+}
+
// Kill is an API that the daemon calls before it actually shuts down and calls
// Shutdown(). It will abort any ongoing or new attempt to establish a SQL gRPC
// connection with the dialer (typically for running some pre-shutdown
@@ -275,16 +295,32 @@ func (g *Gateway) Kill() {
// Shutdown this gateway, stopping the gRPC server and possibly the raft factory.
func (g *Gateway) Shutdown() error {
logger.Info("Stop database gateway")
+
+ if g.raft != nil {
+ err := g.raft.Shutdown()
+ if err != nil {
+ return errors.Wrap(err, "failed to shutdown raft")
+ }
+ }
+
if g.server != nil {
- g.server.Stop()
+ // Dump the content of the database to disk, so the
+ // activateifneeded command can inspect it in order to decide
+ // whether to activate the daemon or not.
+ dir := filepath.Join(g.db.Dir(), "global")
+ err := g.server.Dump("db.bin", dir)
+ if err != nil {
+ // Just log a warning, since this is not fatal.
+ logger.Warnf("Failed to dump database to disk: %v", err)
+ }
+
+ g.server.Close()
// Unset the memory dial, since Shutdown() is also called for
// switching between in-memory and network mode.
g.memoryDial = nil
}
- if g.raft == nil {
- return nil
- }
- return g.raft.Shutdown()
+
+ return nil
}
// Reset the gateway, shutting it down and starting against from scratch using
@@ -363,7 +399,7 @@ func (g *Gateway) LeaderAddress() (string, error) {
}
for _, address := range addresses {
- url := fmt.Sprintf("https://%s%s", address, grpcEndpoint)
+ url := fmt.Sprintf("https://%s%s", address, databaseEndpoint)
request, err := http.NewRequest("GET", url, nil)
if err != nil {
return "", err
@@ -409,20 +445,26 @@ func (g *Gateway) init() error {
// should serve as database node, so create a dqlite driver to be
// exposed it over gRPC.
if raft != nil {
- config := dqlite.DriverConfig{}
- driver, err := dqlite.NewDriver(raft.Registry(), raft.Raft(), config)
+ listener, err := net.Listen("unix", "")
if err != nil {
- return errors.Wrap(err, "failed to create dqlite driver")
+ return errors.Wrap(err, "failed to allocate loopback port")
}
- server := grpcsql.NewServer(driver)
+
if raft.HandlerFunc() == nil {
- // If no raft http handler is set, it means we are in
- // single node mode and we don't have a network
- // endpoint, so let's spin up a fully in-memory gRPC
- // server.
- listener, dial := util.InMemoryNetwork()
- go server.Serve(listener)
- g.memoryDial = grpcMemoryDial(dial)
+ g.memoryDial = dqliteMemoryDial(listener)
+ g.store.inMemory = dqlite.NewInmemServerStore()
+ g.store.Set(context.Background(), []dqlite.ServerInfo{{Address: "0"}})
+ } else {
+ go runDqliteProxy(listener, g.acceptCh)
+ g.store.inMemory = nil
+ }
+
+ provider := &raftAddressProvider{db: g.db}
+ server, err := dqlite.NewServer(
+ raft.Raft(), raft.Registry(), listener,
+ dqlite.WithServerAddressProvider(provider))
+ if err != nil {
+ return errors.Wrap(err, "failed to create dqlite server")
}
g.server = server
@@ -430,7 +472,11 @@ func (g *Gateway) init() error {
} else {
g.server = nil
g.raft = nil
+ g.store.inMemory = nil
}
+
+ g.store.onDisk = dqlite.NewServerStore(g.db.DB(), "main", "raft_nodes", "address")
+
return nil
}
@@ -500,21 +546,15 @@ func (g *Gateway) cachedRaftNodes() ([]string, error) {
return addresses, nil
}
-func grpcNetworkDial(ctx context.Context, addr string, cert *shared.CertInfo) (*grpc.ClientConn, error) {
+func dqliteNetworkDial(ctx context.Context, addr string, cert *shared.CertInfo) (net.Conn, error) {
config, err := tlsClientConfig(cert)
if err != nil {
return nil, err
}
- // The whole attempt should not take more than a few seconds. If the
- // context gets cancelled, calling code will typically try against
- // another database node, in round robin.
- ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
- defer cancel()
-
// Make a probe HEAD request to check if the target node is the leader.
- url := fmt.Sprintf("https://%s%s", addr, grpcEndpoint)
- request, err := http.NewRequest("HEAD", url, nil)
+ path := fmt.Sprintf("https://%s%s", addr, databaseEndpoint)
+ request, err := http.NewRequest("HEAD", path, nil)
if err != nil {
return nil, err
}
@@ -524,36 +564,68 @@ func grpcNetworkDial(ctx context.Context, addr string, cert *shared.CertInfo) (*
if err != nil {
return nil, err
}
+
+ // If the endpoint does not exists, it means that the target node is
+ // running version 1 of dqlite protocol. In that case we simply behave
+ // as the node was at an older LXD version.
+ if response.StatusCode == http.StatusNotFound {
+ return nil, db.ErrSomeNodesAreBehind
+ }
+
if response.StatusCode != http.StatusOK {
return nil, fmt.Errorf(response.Status)
}
- options := []grpc.DialOption{
- grpc.WithTransportCredentials(credentials.NewTLS(config)),
+ // Establish the connection
+ request = &http.Request{
+ Method: "POST",
+ Proto: "HTTP/1.1",
+ ProtoMajor: 1,
+ ProtoMinor: 1,
+ Header: make(http.Header),
+ Host: addr,
}
- return grpc.DialContext(ctx, addr, options...)
-}
+ request.URL, err = url.Parse(path)
+ if err != nil {
+ return nil, err
+ }
+
+ request.Header.Set("Upgrade", "dqlite")
+ request = request.WithContext(ctx)
+
+ deadline, _ := ctx.Deadline()
+ dialer := &net.Dialer{Timeout: time.Until(deadline)}
+
+ conn, err := tls.DialWithDialer(dialer, "tcp", addr, config)
-// Convert a raw in-memory dial function into a gRPC one.
-func grpcMemoryDial(dial func() net.Conn) func() (*grpc.ClientConn, error) {
- options := []grpc.DialOption{
- grpc.WithInsecure(),
- grpc.WithBlock(),
- grpc.WithDialer(func(string, time.Duration) (net.Conn, error) {
- return dial(), nil
- }),
+ if err := request.Write(conn); err != nil {
+ return nil, errors.Wrap(err, "sending HTTP request failed")
}
- return func() (*grpc.ClientConn, error) {
- return grpc.Dial("", options...)
+
+ response, err = http.ReadResponse(bufio.NewReader(conn), request)
+ if err != nil {
+ return nil, errors.Wrap(err, "failed to read response")
}
+ if response.StatusCode != http.StatusSwitchingProtocols {
+ return nil, fmt.Errorf("dialing fail: expected status code 101 got %d", response.StatusCode)
+ }
+ if response.Header.Get("Upgrade") != "dqlite" {
+ return nil, fmt.Errorf("missing or unexpected Upgrade header in response")
+ }
+
+ return conn, err
}
-// The LXD API endpoint path that gets routed to a gRPC server handler for
-// performing SQL queries against the dqlite driver running on this node.
-//
-// FIXME: figure out if there's a way to configure the gRPC client to add a
-// prefix to this url, e.g. /internal/db/protocol.SQL/Conn.
-const grpcEndpoint = "/protocol.SQL/Conn"
+// Create a dial function that connects to the given listener.
+func dqliteMemoryDial(listener net.Listener) dqlite.DialFunc {
+ return func(ctx context.Context, address string) (net.Conn, error) {
+ return net.Dial("unix", listener.Addr().String())
+ }
+}
+
+// The LXD API endpoint path that gets routed to a dqlite server handler for
+// performing SQL queries against the dqlite server running on this node.
+const databaseEndpoint = "/internal/database"
// Redirect dqlite's logs to our own logger
func dqliteLog(configuredLevel string) func(level, message string) {
@@ -582,3 +654,41 @@ func dqliteLog(configuredLevel string) func(level, message string) {
}
}
}
+
+func runDqliteProxy(listener net.Listener, acceptCh chan net.Conn) {
+ for {
+ src := <-acceptCh
+ dst, err := net.Dial("unix", listener.Addr().String())
+ if err != nil {
+ panic(err)
+ }
+ go func() {
+ io.Copy(eagain.Writer{Writer: dst}, eagain.Reader{Reader: src})
+ src.Close()
+ }()
+ go func() {
+ io.Copy(eagain.Writer{Writer: src}, eagain.Reader{Reader: dst})
+ dst.Close()
+ }()
+ }
+}
+
+// Conditionally uses the in-memory or the on-disk server store.
+type dqliteServerStore struct {
+ inMemory dqlite.ServerStore
+ onDisk dqlite.ServerStore
+}
+
+func (s *dqliteServerStore) Get(ctx context.Context) ([]dqlite.ServerInfo, error) {
+ if s.inMemory != nil {
+ return s.inMemory.Get(ctx)
+ }
+ return s.onDisk.Get(ctx)
+}
+
+func (s *dqliteServerStore) Set(ctx context.Context, servers []dqlite.ServerInfo) error {
+ if s.inMemory != nil {
+ return s.inMemory.Set(ctx, servers)
+ }
+ return s.onDisk.Set(ctx, servers)
+}
diff --git a/lxd/cluster/gateway_test.go b/lxd/cluster/gateway_test.go
index 71ed46913..48fa86c4b 100644
--- a/lxd/cluster/gateway_test.go
+++ b/lxd/cluster/gateway_test.go
@@ -10,7 +10,7 @@ import (
"path/filepath"
"testing"
- "github.com/CanonicalLtd/go-grpc-sql"
+ "github.com/CanonicalLtd/go-dqlite"
"github.com/hashicorp/raft"
"github.com/lxc/lxd/lxd/cluster"
"github.com/lxc/lxd/lxd/db"
@@ -18,6 +18,7 @@ import (
"github.com/lxc/lxd/shared/logging"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+ "golang.org/x/net/context"
)
// Basic creation and shutdown. By default, the gateway runs an in-memory gRPC
@@ -44,8 +45,8 @@ func TestGateway_Single(t *testing.T) {
assert.Equal(t, 404, w.Code, endpoint)
}
- dialer := gateway.Dialer()
- conn, err := dialer()
+ dial := gateway.DialFunc()
+ conn, err := dial(context.Background(), "")
assert.NoError(t, err)
assert.NotNil(t, conn)
@@ -66,7 +67,7 @@ func TestGateway_SingleWithNetworkAddress(t *testing.T) {
defer server.Close()
address := server.Listener.Addr().String()
- setRaftRole(t, db, address)
+ store := setRaftRole(t, db, address)
gateway := newGateway(t, db, cert)
defer gateway.Shutdown()
@@ -75,9 +76,12 @@ func TestGateway_SingleWithNetworkAddress(t *testing.T) {
mux.HandleFunc(path, handler)
}
- driver := grpcsql.NewDriver(gateway.Dialer())
+ driver, err := dqlite.NewDriver(store, dqlite.WithDialFunc(gateway.DialFunc()))
+ require.NoError(t, err)
+
conn, err := driver.Open("test.db")
require.NoError(t, err)
+
require.NoError(t, conn.Close())
leader, err := gateway.LeaderAddress()
diff --git a/lxd/cluster/heartbeat.go b/lxd/cluster/heartbeat.go
index 67a27ff58..986505649 100644
--- a/lxd/cluster/heartbeat.go
+++ b/lxd/cluster/heartbeat.go
@@ -141,7 +141,7 @@ func heartbeatNode(taskCtx context.Context, address string, cert *shared.CertInf
if err != nil {
return err
}
- url := fmt.Sprintf("https://%s%s", address, grpcEndpoint)
+ url := fmt.Sprintf("https://%s%s", address, databaseEndpoint)
client := &http.Client{Transport: &http.Transport{TLSClientConfig: config}}
buffer := bytes.Buffer{}
diff --git a/lxd/cluster/heartbeat_test.go b/lxd/cluster/heartbeat_test.go
index b1def2ed1..b6d698ad5 100644
--- a/lxd/cluster/heartbeat_test.go
+++ b/lxd/cluster/heartbeat_test.go
@@ -6,6 +6,7 @@ import (
"testing"
"time"
+ "github.com/CanonicalLtd/go-dqlite"
"github.com/hashicorp/raft"
"github.com/lxc/lxd/lxd/cluster"
"github.com/lxc/lxd/lxd/db"
@@ -249,7 +250,10 @@ func (f *heartbeatFixture) node() (*state.State, *cluster.Gateway, string) {
var err error
require.NoError(f.t, state.Cluster.Close())
- state.Cluster, err = db.OpenCluster("db.bin", gateway.Dialer(), address, "/unused/db/dir")
+ store := gateway.ServerStore()
+ dial := gateway.DialFunc()
+ state.Cluster, err = db.OpenCluster(
+ "db.bin", store, address, "/unused/db/dir", dqlite.WithDialFunc(dial))
require.NoError(f.t, err)
f.gateways[len(f.gateways)] = gateway
diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go
index bbe80c92d..564a374c0 100644
--- a/lxd/cluster/membership.go
+++ b/lxd/cluster/membership.go
@@ -85,8 +85,8 @@ func Bootstrap(state *state.State, gateway *Gateway, name string) error {
return err
}
- // Shutdown the gateway. This will trash any gRPC SQL connection
- // against our in-memory dqlite driver and shutdown the associated raft
+ // Shutdown the gateway. This will trash any dqlite connection against
+ // our in-memory dqlite driver and shutdown the associated raft
// instance. We also lock regular access to the cluster database since
// we don't want any other database code to run while we're
// reconfiguring raft.
@@ -107,6 +107,7 @@ func Bootstrap(state *state.State, gateway *Gateway, name string) error {
if err != nil {
return errors.Wrap(err, "failed to re-initialize gRPC SQL gateway")
}
+
err = gateway.waitLeadership()
if err != nil {
return err
diff --git a/lxd/cluster/membership_test.go b/lxd/cluster/membership_test.go
index b1cbc7c86..97ac52db9 100644
--- a/lxd/cluster/membership_test.go
+++ b/lxd/cluster/membership_test.go
@@ -7,7 +7,7 @@ import (
"path/filepath"
"testing"
- "github.com/CanonicalLtd/go-grpc-sql"
+ "github.com/CanonicalLtd/go-dqlite"
"github.com/lxc/lxd/lxd/cluster"
"github.com/lxc/lxd/lxd/db"
"github.com/lxc/lxd/lxd/state"
@@ -123,11 +123,6 @@ func TestBootstrap(t *testing.T) {
mux.HandleFunc(path, handler)
}
- driver := grpcsql.NewDriver(gateway.Dialer())
- conn, err := driver.Open("test.db")
- require.NoError(t, err)
- require.NoError(t, conn.Close())
-
count, err := cluster.Count(state)
require.NoError(t, err)
assert.Equal(t, 1, count)
@@ -254,15 +249,24 @@ func TestJoin(t *testing.T) {
}
targetAddress := targetServer.Listener.Addr().String()
- var err error
+
require.NoError(t, targetState.Cluster.Close())
- targetState.Cluster, err = db.OpenCluster("db.bin", targetGateway.Dialer(), targetAddress, "/unused/db/dir")
+
+ targetStore := targetGateway.ServerStore()
+ targetDialFunc := targetGateway.DialFunc()
+
+ var err error
+ targetState.Cluster, err = db.OpenCluster(
+ "db.bin", targetStore, targetAddress, "/unused/db/dir",
+ dqlite.WithDialFunc(targetDialFunc))
require.NoError(t, err)
+
targetF := &membershipFixtures{t: t, state: targetState}
targetF.NetworkAddress(targetAddress)
err = cluster.Bootstrap(targetState, targetGateway, "buzz")
require.NoError(t, err)
+ _, err = targetState.Cluster.Networks()
// Setup a joining node
mux := http.NewServeMux()
@@ -282,8 +286,14 @@ func TestJoin(t *testing.T) {
}
address := server.Listener.Addr().String()
+
require.NoError(t, state.Cluster.Close())
- state.Cluster, err = db.OpenCluster("db.bin", gateway.Dialer(), address, "/unused/db/dir")
+
+ store := gateway.ServerStore()
+ dialFunc := gateway.DialFunc()
+
+ state.Cluster, err = db.OpenCluster(
+ "db.bin", store, address, "/unused/db/dir", dqlite.WithDialFunc(dialFunc))
require.NoError(t, err)
f := &membershipFixtures{t: t, state: state}
@@ -368,7 +378,10 @@ func FLAKY_TestPromote(t *testing.T) {
targetAddress := targetServer.Listener.Addr().String()
var err error
require.NoError(t, targetState.Cluster.Close())
- targetState.Cluster, err = db.OpenCluster("db.bin", targetGateway.Dialer(), targetAddress, "/unused/db/dir")
+ store := targetGateway.ServerStore()
+ dialFunc := targetGateway.DialFunc()
+ targetState.Cluster, err = db.OpenCluster(
+ "db.bin", store, targetAddress, "/unused/db/dir", dqlite.WithDialFunc(dialFunc))
require.NoError(t, err)
targetF := &membershipFixtures{t: t, state: targetState}
targetF.NetworkAddress(targetAddress)
@@ -397,9 +410,6 @@ func FLAKY_TestPromote(t *testing.T) {
mux.HandleFunc(path, handler)
}
- state.Cluster, err = db.OpenCluster("db.bin", gateway.Dialer(), address, "/unused/db/dir")
- require.NoError(t, err)
-
// Promote the node.
targetF.RaftNode(address) // Add the address of the node to be promoted in the leader's db
raftNodes := targetF.RaftNodes()
diff --git a/lxd/cluster/raft.go b/lxd/cluster/raft.go
index 742fa26fd..ca9fc4b32 100644
--- a/lxd/cluster/raft.go
+++ b/lxd/cluster/raft.go
@@ -14,7 +14,7 @@ import (
"strings"
"time"
- "github.com/CanonicalLtd/dqlite"
+ "github.com/CanonicalLtd/go-dqlite"
"github.com/CanonicalLtd/raft-http"
"github.com/CanonicalLtd/raft-membership"
"github.com/boltdb/bolt"
@@ -183,7 +183,8 @@ func raftInstanceInit(
}
// The dqlite registry and FSM.
- registry := dqlite.NewRegistry(dir)
+ registry := dqlite.NewRegistry(strconv.Itoa(serial))
+ serial++
fsm := dqlite.NewFSM(registry)
// The actual raft instance.
@@ -214,6 +215,8 @@ func raftInstanceInit(
return instance, nil
}
+var serial = 99
+
// Registry returns the dqlite Registry associated with the raft instance.
func (i *raftInstance) Registry() *dqlite.Registry {
return i.registry
@@ -280,7 +283,8 @@ func (i *raftInstance) Shutdown() error {
errCh := make(chan error)
timer := time.After(timeout)
go func() {
- errCh <- i.raft.Snapshot().Error()
+ //errCh <- i.raft.Snapshot().Error()
+ errCh <- nil
}()
// In case of error we just log a warning, since this is not really
// fatal.
diff --git a/lxd/cluster/raft_test.go b/lxd/cluster/raft_test.go
index 9e6cfb983..223127da5 100644
--- a/lxd/cluster/raft_test.go
+++ b/lxd/cluster/raft_test.go
@@ -7,6 +7,7 @@ import (
"testing"
"time"
+ "github.com/CanonicalLtd/go-dqlite"
"github.com/CanonicalLtd/raft-test"
"github.com/hashicorp/raft"
"github.com/lxc/lxd/lxd/cluster"
@@ -122,7 +123,7 @@ func newRaft(t *testing.T, db *db.Node, cert *shared.CertInfo) *cluster.RaftInst
// address into the raft_nodes table.
//
// This effectively makes the node act as a database raft node.
-func setRaftRole(t *testing.T, database *db.Node, address string) {
+func setRaftRole(t *testing.T, database *db.Node, address string) *dqlite.DatabaseServerStore {
require.NoError(t, database.Transaction(func(tx *db.NodeTx) error {
err := tx.UpdateConfig(map[string]string{"core.https_address": address})
if err != nil {
@@ -131,6 +132,9 @@ func setRaftRole(t *testing.T, database *db.Node, address string) {
_, err = tx.RaftNodeAdd(address)
return err
}))
+
+ store := dqlite.NewServerStore(database.DB(), "main", "raft_nodes", "address")
+ return store
}
// Create a new test HTTP server configured with the given TLS certificate and
diff --git a/lxd/cluster/upgrade.go b/lxd/cluster/upgrade.go
index 710aff7bd..ce6fcb4bd 100644
--- a/lxd/cluster/upgrade.go
+++ b/lxd/cluster/upgrade.go
@@ -25,7 +25,7 @@ func NotifyUpgradeCompleted(state *state.State, cert *shared.CertInfo) error {
return errors.Wrap(err, "failed to get connection info")
}
- url := fmt.Sprintf("%s%s", info.Addresses[0], grpcEndpoint)
+ url := fmt.Sprintf("%s%s", info.Addresses[0], databaseEndpoint)
request, err := http.NewRequest("PATCH", url, nil)
if err != nil {
return errors.Wrap(err, "failed to create database notify upgrade request")
From 126b0cc2dfcec8b83dbaa36f62d755173c210860 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Fri, 13 Jul 2018 12:20:06 +0000
Subject: [PATCH 5/7] Adapt main package to new cluster sub-package API
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
Makefile | 2 +-
lxd/api_cluster.go | 8 ++++++-
lxd/api_cluster_test.go | 1 +
lxd/daemon.go | 19 +++++++++++-----
lxd/daemon_integration_test.go | 7 ++++--
lxd/main_activateifneeded.go | 4 ++--
lxd/response.go | 2 +-
shared/logging/testing.go | 2 +-
test/includes/lxd.sh | 51 ++++++++++++++++++++++--------------------
test/suites/basic.sh | 6 +++++
test/suites/clustering.sh | 4 ++--
11 files changed, 67 insertions(+), 39 deletions(-)
diff --git a/Makefile b/Makefile
index 76bdb33d4..7efb277fb 100644
--- a/Makefile
+++ b/Makefile
@@ -9,7 +9,7 @@ POTFILE=po/$(DOMAIN).pot
# TODO: use git describe for versioning
VERSION=$(shell grep "var Version" shared/version/flex.go | cut -d'"' -f2)
ARCHIVE=lxd-$(VERSION).tar
-TAGS=$(shell printf "\#include <sqlite3.h>\nvoid main(){int n = SQLITE_CONFIG_REPLICATION;}" | $(CC) -o /dev/null -xc - >/dev/null 2>&1 && echo "-tags libsqlite3")
+TAGS=$(shell printf "\#include <sqlite3.h>\nvoid main(){int n = SQLITE_IOERR_NOT_LEADER;}" | $(CC) -o /dev/null -xc - >/dev/null 2>&1 && echo "-tags libsqlite3")
.PHONY: default
default:
diff --git a/lxd/api_cluster.go b/lxd/api_cluster.go
index 89f2b5499..0bcfda56e 100644
--- a/lxd/api_cluster.go
+++ b/lxd/api_cluster.go
@@ -9,6 +9,7 @@ import (
"path/filepath"
"strconv"
+ "github.com/CanonicalLtd/go-dqlite"
"github.com/gorilla/mux"
lxd "github.com/lxc/lxd/client"
"github.com/lxc/lxd/lxd/cluster"
@@ -469,7 +470,12 @@ func clusterPutDisable(d *Daemon) Response {
if err != nil {
return SmartError(err)
}
- d.cluster, err = db.OpenCluster("db.bin", d.gateway.Dialer(), address, "/unused/db/dir")
+ store := d.gateway.ServerStore()
+ d.cluster, err = db.OpenCluster(
+ "db.bin", store, address, "/unused/db/dir",
+ dqlite.WithDialFunc(d.gateway.DialFunc()),
+ dqlite.WithContext(d.gateway.Context()),
+ )
if err != nil {
return SmartError(err)
}
diff --git a/lxd/api_cluster_test.go b/lxd/api_cluster_test.go
index a8411e3dd..8e7bea1a6 100644
--- a/lxd/api_cluster_test.go
+++ b/lxd/api_cluster_test.go
@@ -109,6 +109,7 @@ func TestCluster_Join(t *testing.T) {
// Make the second node join the cluster.
f.RegisterCertificate(daemons[1], daemons[0], "rusp", "sekret")
+
address := daemons[0].endpoints.NetworkAddress()
cert := string(daemons[0].endpoints.NetworkPublicKey())
client = f.ClientUnix(daemons[1])
diff --git a/lxd/daemon.go b/lxd/daemon.go
index 31c7bd0d1..451cc28f8 100644
--- a/lxd/daemon.go
+++ b/lxd/daemon.go
@@ -17,6 +17,7 @@ import (
"time"
"github.com/CanonicalLtd/candidclient"
+ "github.com/CanonicalLtd/go-dqlite"
"github.com/gorilla/mux"
"github.com/pkg/errors"
"golang.org/x/net/context"
@@ -76,9 +77,10 @@ type externalAuth struct {
// DaemonConfig holds configuration values for Daemon.
type DaemonConfig struct {
- Group string // Group name the local unix socket should be chown'ed to
- Trace []string // List of sub-systems to trace
- RaftLatency float64 // Coarse grain measure of the cluster latency
+ Group string // Group name the local unix socket should be chown'ed to
+ Trace []string // List of sub-systems to trace
+ RaftLatency float64 // Coarse grain measure of the cluster latency
+ DqliteSetupTimeout time.Duration // How long to wait for the cluster database to be up
}
// NewDaemon returns a new Daemon object with the given configuration.
@@ -95,7 +97,8 @@ func NewDaemon(config *DaemonConfig, os *sys.OS) *Daemon {
// DefaultDaemonConfig returns a DaemonConfig object with default values/
func DefaultDaemonConfig() *DaemonConfig {
return &DaemonConfig{
- RaftLatency: 3.0,
+ RaftLatency: 3.0,
+ DqliteSetupTimeout: 36 * time.Hour, // Account for snap refresh lag
}
}
@@ -469,7 +472,13 @@ func (d *Daemon) init() error {
for {
logger.Info("Initializing global database")
dir := filepath.Join(d.os.VarDir, "database")
- d.cluster, err = db.OpenCluster("db.bin", d.gateway.Dialer(), address, dir)
+ store := d.gateway.ServerStore()
+ d.cluster, err = db.OpenCluster(
+ "db.bin", store, address, dir,
+ dqlite.WithDialFunc(d.gateway.DialFunc()),
+ dqlite.WithContext(d.gateway.Context()),
+ dqlite.WithConnectionTimeout(d.config.DqliteSetupTimeout),
+ )
if err == nil {
break
}
diff --git a/lxd/daemon_integration_test.go b/lxd/daemon_integration_test.go
index d1b113171..aad69f162 100644
--- a/lxd/daemon_integration_test.go
+++ b/lxd/daemon_integration_test.go
@@ -2,6 +2,7 @@ package main
import (
"testing"
+ "time"
lxd "github.com/lxc/lxd/client"
"github.com/lxc/lxd/lxd/sys"
@@ -40,7 +41,7 @@ func newDaemon(t *testing.T) (*Daemon, func()) {
require.NoError(t, daemon.Init())
cleanup := func() {
- require.NoError(t, daemon.Stop())
+ daemon.Stop()
osCleanup()
resetLogger()
}
@@ -76,6 +77,8 @@ func newDaemons(t *testing.T, n int) ([]*Daemon, func()) {
// Create a new DaemonConfig object for testing purposes.
func newConfig() *DaemonConfig {
return &DaemonConfig{
- RaftLatency: 0.8,
+ RaftLatency: 0.8,
+ Trace: []string{"dqlite"},
+ DqliteSetupTimeout: 10 * time.Second,
}
}
diff --git a/lxd/main_activateifneeded.go b/lxd/main_activateifneeded.go
index 708eb57ab..b7a892741 100644
--- a/lxd/main_activateifneeded.go
+++ b/lxd/main_activateifneeded.go
@@ -5,7 +5,7 @@ import (
"fmt"
"os"
- "github.com/CanonicalLtd/go-sqlite3"
+ "github.com/mattn/go-sqlite3"
"github.com/spf13/cobra"
"github.com/lxc/lxd/client"
@@ -71,7 +71,7 @@ func (c *cmdActivateifneeded) Run(cmd *cobra.Command, args []string) error {
}
d.db = db.ForLegacyPatches(sqldb)
- /* Load the configured address the database */
+ // Load the configured address from the database
address, err := node.HTTPSAddress(d.db)
if err != nil {
return err
diff --git a/lxd/response.go b/lxd/response.go
index d01af2850..256341759 100644
--- a/lxd/response.go
+++ b/lxd/response.go
@@ -11,7 +11,7 @@ import (
"os"
"time"
- "github.com/CanonicalLtd/go-sqlite3"
+ "github.com/mattn/go-sqlite3"
lxd "github.com/lxc/lxd/client"
"github.com/lxc/lxd/lxd/cluster"
diff --git a/shared/logging/testing.go b/shared/logging/testing.go
index 22c3a9a90..bad3375c9 100644
--- a/shared/logging/testing.go
+++ b/shared/logging/testing.go
@@ -34,6 +34,6 @@ func (h *testingHandler) Log(r *log.Record) error {
}
h.t.Logf("%s %s %s%s", r.Time.Format("15:04:05.000"), r.Lvl, r.Msg, ctx)
- //fmt.Printf("%s %s %s%s\n", r.Time.Format("15:04:05.000"), r.Lvl, r.Msg, ctx)
+
return nil
}
diff --git a/test/includes/lxd.sh b/test/includes/lxd.sh
index f424a4811..ee5485822 100644
--- a/test/includes/lxd.sh
+++ b/test/includes/lxd.sh
@@ -211,30 +211,33 @@ kill_lxd() {
check_empty "${daemon_dir}/shmounts/"
check_empty "${daemon_dir}/snapshots/"
- echo "==> Checking for leftover cluster DB entries"
- # FIXME: we should not use the command line sqlite client, since it's
- # not compatible with dqlite
- check_empty_table "${daemon_dir}/database/global/db.bin" "containers"
- check_empty_table "${daemon_dir}/database/global/db.bin" "containers_config"
- check_empty_table "${daemon_dir}/database/global/db.bin" "containers_devices"
- check_empty_table "${daemon_dir}/database/global/db.bin" "containers_devices_config"
- check_empty_table "${daemon_dir}/database/global/db.bin" "containers_profiles"
- check_empty_table "${daemon_dir}/database/global/db.bin" "images"
- check_empty_table "${daemon_dir}/database/global/db.bin" "images_aliases"
- check_empty_table "${daemon_dir}/database/global/db.bin" "images_properties"
- check_empty_table "${daemon_dir}/database/global/db.bin" "images_source"
- check_empty_table "${daemon_dir}/database/global/db.bin" "images_nodes"
- check_empty_table "${daemon_dir}/database/global/db.bin" "networks"
- check_empty_table "${daemon_dir}/database/global/db.bin" "networks_config"
- check_empty_table "${daemon_dir}/database/global/db.bin" "profiles"
- check_empty_table "${daemon_dir}/database/global/db.bin" "profiles_config"
- check_empty_table "${daemon_dir}/database/global/db.bin" "profiles_devices"
- check_empty_table "${daemon_dir}/database/global/db.bin" "profiles_devices_config"
- check_empty_table "${daemon_dir}/database/global/db.bin" "storage_pools"
- check_empty_table "${daemon_dir}/database/global/db.bin" "storage_pools_nodes"
- check_empty_table "${daemon_dir}/database/global/db.bin" "storage_pools_config"
- check_empty_table "${daemon_dir}/database/global/db.bin" "storage_volumes"
- check_empty_table "${daemon_dir}/database/global/db.bin" "storage_volumes_config"
+ # Only check for leftover db entries when we're not clustered, since in
+ # that case the disk dump that we take at shutdown is not guaranteed to
+ # be fully up-to-date.
+ if [ ! -f "${daemon_dir}/cluster.crt" ]; then
+ echo "==> Checking for leftover DB entries"
+ check_empty_table "${daemon_dir}/database/global/db.bin" "containers"
+ check_empty_table "${daemon_dir}/database/global/db.bin" "containers_config"
+ check_empty_table "${daemon_dir}/database/global/db.bin" "containers_devices"
+ check_empty_table "${daemon_dir}/database/global/db.bin" "containers_devices_config"
+ check_empty_table "${daemon_dir}/database/global/db.bin" "containers_profiles"
+ check_empty_table "${daemon_dir}/database/global/db.bin" "images"
+ check_empty_table "${daemon_dir}/database/global/db.bin" "images_aliases"
+ check_empty_table "${daemon_dir}/database/global/db.bin" "images_properties"
+ check_empty_table "${daemon_dir}/database/global/db.bin" "images_source"
+ check_empty_table "${daemon_dir}/database/global/db.bin" "images_nodes"
+ check_empty_table "${daemon_dir}/database/global/db.bin" "networks"
+ check_empty_table "${daemon_dir}/database/global/db.bin" "networks_config"
+ check_empty_table "${daemon_dir}/database/global/db.bin" "profiles"
+ check_empty_table "${daemon_dir}/database/global/db.bin" "profiles_config"
+ check_empty_table "${daemon_dir}/database/global/db.bin" "profiles_devices"
+ check_empty_table "${daemon_dir}/database/global/db.bin" "profiles_devices_config"
+ check_empty_table "${daemon_dir}/database/global/db.bin" "storage_pools"
+ check_empty_table "${daemon_dir}/database/global/db.bin" "storage_pools_nodes"
+ check_empty_table "${daemon_dir}/database/global/db.bin" "storage_pools_config"
+ check_empty_table "${daemon_dir}/database/global/db.bin" "storage_volumes"
+ check_empty_table "${daemon_dir}/database/global/db.bin" "storage_volumes_config"
+ fi
fi
# teardown storage
diff --git a/test/suites/basic.sh b/test/suites/basic.sh
index 2fd97141a..dec56abbd 100644
--- a/test/suites/basic.sh
+++ b/test/suites/basic.sh
@@ -259,6 +259,12 @@ test_basic_usage() {
lxc init testimage autostart --force-local
lxd activateifneeded --debug 2>&1 | grep -q -v "activating..."
lxc config set autostart boot.autostart true --force-local
+
+ # Restart the daemon, this forces the global database to be dumped to disk.
+ shutdown_lxd "${LXD_DIR}"
+ respawn_lxd "${LXD_DIR}" true
+ lxc stop --force autostart
+
lxd activateifneeded --debug 2>&1 | grep -q "Daemon has auto-started containers, activating..."
lxc config unset autostart boot.autostart --force-local
diff --git a/test/suites/clustering.sh b/test/suites/clustering.sh
index ab8debbb9..28bd38913 100644
--- a/test/suites/clustering.sh
+++ b/test/suites/clustering.sh
@@ -138,14 +138,14 @@ test_clustering_membership() {
# Rename a node using the pre-existing name.
LXD_DIR="${LXD_ONE_DIR}" lxc cluster rename node4 node3
- # Trying to delete a container which is the only one with a copy of
+ # Trying to delete a node which is the only one with a copy of
# an image results in an error
LXD_DIR="${LXD_FOUR_DIR}" ensure_import_testimage
! LXD_DIR="${LXD_FOUR_DIR}" lxc cluster remove node3
LXD_DIR="${LXD_TWO_DIR}" lxc image delete testimage
# Remove a node gracefully.
- LXD_DIR="${LXD_FOUR_DIR}" lxc cluster remove node3
+ LXD_DIR="${LXD_ONE_DIR}" lxc cluster remove node3
! LXD_DIR="${LXD_FOUR_DIR}" lxc cluster list
LXD_DIR="${LXD_FIVE_DIR}" lxd shutdown
From 34db0d03a571c8d61fb88c9d65561cd8e80bc241 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 17 Jul 2018 19:53:51 +0000
Subject: [PATCH 6/7] Drop raft snapshot workaround
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/cluster/raft.go | 30 ++----------------------------
1 file changed, 2 insertions(+), 28 deletions(-)
diff --git a/lxd/cluster/raft.go b/lxd/cluster/raft.go
index ca9fc4b32..bbb4074aa 100644
--- a/lxd/cluster/raft.go
+++ b/lxd/cluster/raft.go
@@ -275,30 +275,8 @@ func (i *raftInstance) Shutdown() error {
// Invoke raft APIs asynchronously to allow for a timeout.
timeout := 10 * time.Second
- // FIXME/TODO: We take a snapshot before when shutting down the daemon
- // so there will be no uncompacted raft logs at the next
- // startup. This is a workaround for slow log replay when
- // the LXD daemon starts (see #4485). A more proper fix
- // should be probably implemented in dqlite.
errCh := make(chan error)
timer := time.After(timeout)
- go func() {
- //errCh <- i.raft.Snapshot().Error()
- errCh <- nil
- }()
- // In case of error we just log a warning, since this is not really
- // fatal.
- select {
- case err := <-errCh:
- if err != nil && err != raft.ErrNothingNewToSnapshot {
- logger.Warnf("Failed to take raft snapshot: %v", err)
- }
- case <-timer:
- logger.Warnf("Timeout waiting for raft to take a snapshot")
- }
-
- errCh = make(chan error)
- timer = time.After(timeout)
go func() {
errCh <- i.raft.Shutdown().Error()
}()
@@ -406,12 +384,8 @@ func raftConfig(latency float64) *raft.Config {
scale(duration)
}
- // FIXME/TODO: We increase the frequency of snapshots here to keep the
- // number of uncompacted raft logs low, and workaround slow
- // log replay when the LXD daemon starts (see #4485). A more
- // proper fix should be probably implemented in dqlite.
- config.SnapshotThreshold = 512
- config.TrailingLogs = 128
+ config.SnapshotThreshold = 1024
+ config.TrailingLogs = 512
return config
}
From 8107e292f2caf4ee1c1c6b3b646e358c520ccc72 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 17 Jul 2018 19:58:13 +0000
Subject: [PATCH 7/7] Fetch containers info in parallel
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/containers_get.go | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
diff --git a/lxd/containers_get.go b/lxd/containers_get.go
index a54c8964d..3a4fe94e4 100644
--- a/lxd/containers_get.go
+++ b/lxd/containers_get.go
@@ -80,6 +80,7 @@ func doContainersGet(d *Daemon, r *http.Request) (interface{}, error) {
resultMu.Unlock()
}
+ wg := sync.WaitGroup{}
for address, containers := range result {
// If this is an internal request from another cluster node,
// ignore containers from other nodes, and return only the ones
@@ -100,7 +101,9 @@ func doContainersGet(d *Daemon, r *http.Request) (interface{}, error) {
// For recursion requests we need to fetch the state of remote
// containers from their respective nodes.
if recursion && address != "" && !isClusterNotification(r) {
- func(address string, containers []string) {
+ wg.Add(1)
+ go func(address string, containers []string) {
+ defer wg.Done()
cert := d.endpoints.NetworkCert()
cs, err := doContainersGetFromNode(address, cert)
@@ -135,6 +138,7 @@ func doContainersGet(d *Daemon, r *http.Request) (interface{}, error) {
}
}
}
+ wg.Wait()
if !recursion {
return resultString, nil
More information about the lxc-devel
mailing list