[lxc-devel] [lxd/master] Adapt to new go dqlite api
freeekanayaka on Github
lxc-bot at linuxcontainers.org
Tue Aug 27 19:14:58 UTC 2019
A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 556 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20190827/48594744/attachment-0001.bin>
-------------- next part --------------
From 0ca7700a571a60e51511bc72c87fdae9ef4da45b Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Sun, 25 Aug 2019 17:35:31 +0200
Subject: [PATCH 01/15] Convert dump to new client API
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/cluster/gateway.go | 13 ++++++++++++-
1 file changed, 12 insertions(+), 1 deletion(-)
diff --git a/lxd/cluster/gateway.go b/lxd/cluster/gateway.go
index 903caa4dd5..2980c87e66 100644
--- a/lxd/cluster/gateway.go
+++ b/lxd/cluster/gateway.go
@@ -18,6 +18,7 @@ import (
"time"
dqlite "github.com/canonical/go-dqlite"
+ dqliteclient "github.com/canonical/go-dqlite/client"
"github.com/lxc/lxd/lxd/db"
"github.com/lxc/lxd/lxd/util"
"github.com/lxc/lxd/shared"
@@ -405,7 +406,13 @@ func (g *Gateway) Sync() {
return
}
- files, err := g.server.Dump(context.Background(), "db.bin")
+ client, err := g.getClient()
+ if err != nil {
+ logger.Warnf("Failed to get client: %v", err)
+ return
+ }
+
+ files, err := client.Dump(context.Background(), "db.bin")
if err != nil {
// Just log a warning, since this is not fatal.
logger.Warnf("Failed get database dump: %v", err)
@@ -423,6 +430,10 @@ func (g *Gateway) Sync() {
}
}
+func (g *Gateway) getClient() (*dqliteclient.Client, error) {
+ return dqliteclient.New(context.Background(), g.bindAddress)
+}
+
// Reset the gateway, shutting it down and starting against from scratch using
// the given certificate.
//
From ac95fa4f13aa34a1627b12048ddf438d78a568ac Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 27 Aug 2019 14:42:48 +0200
Subject: [PATCH 02/15] Improve dqlite proxy error messages and abort both
sides on error
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/cluster/gateway.go | 15 ++++++++-------
1 file changed, 8 insertions(+), 7 deletions(-)
diff --git a/lxd/cluster/gateway.go b/lxd/cluster/gateway.go
index 2980c87e66..e6de9585f1 100644
--- a/lxd/cluster/gateway.go
+++ b/lxd/cluster/gateway.go
@@ -840,17 +840,18 @@ func dqliteNetworkDial(ctx context.Context, addr string, g *Gateway, checkLeader
go func() {
_, err := io.Copy(eagain.Writer{Writer: goUnix}, eagain.Reader{Reader: conn})
if err != nil {
- logger.Warnf("Error during dqlite proxy copy: %v", err)
+ logger.Warnf("Dqlite client proxy TLS -> Unix: %v", err)
}
+ goUnix.Close()
conn.Close()
}()
go func() {
_, err := io.Copy(eagain.Writer{Writer: conn}, eagain.Reader{Reader: goUnix})
if err != nil {
- logger.Warnf("Error during dqlite proxy copy: %v", err)
+ logger.Warnf("Dqlite client proxy Unix -> TLS: %v", err)
}
-
+ conn.Close()
goUnix.Close()
}()
@@ -904,18 +905,18 @@ func runDqliteProxy(bindAddress string, acceptCh chan net.Conn) {
go func() {
_, err := io.Copy(eagain.Writer{Writer: dst}, eagain.Reader{Reader: src})
if err != nil {
- logger.Warnf("Error during dqlite proxy copy: %v", err)
+ logger.Warnf("Dqlite server proxy TLS -> Unix: %v", err)
}
-
src.Close()
+ dst.Close()
}()
go func() {
_, err := io.Copy(eagain.Writer{Writer: src}, eagain.Reader{Reader: dst})
if err != nil {
- logger.Warnf("Error during dqlite proxy copy: %v", err)
+ logger.Warnf("Dqlite server proxy Unix -> TLS: %v", err)
}
-
+ src.Close()
dst.Close()
}()
}
From 9528083eca839378c27358b3c0e64c0c245a2a2a Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 27 Aug 2019 15:07:28 +0200
Subject: [PATCH 03/15] Adapt to new Server.Leader() API
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/cluster/gateway.go | 14 +++++++-------
1 file changed, 7 insertions(+), 7 deletions(-)
diff --git a/lxd/cluster/gateway.go b/lxd/cluster/gateway.go
index e6de9585f1..f108baed58 100644
--- a/lxd/cluster/gateway.go
+++ b/lxd/cluster/gateway.go
@@ -240,12 +240,12 @@ func (g *Gateway) HandlerFuncs(nodeRefreshTask func(*APIHeartbeat)) map[string]h
// probes the node to see if it's currently the leader
// (otherwise it tries with another node or retry later).
if r.Method == "HEAD" {
- leader, err := g.server.LeaderAddress(context.Background())
+ leader, err := g.server.Leader(context.Background())
if err != nil {
http.Error(w, "500 failed to get leader address", http.StatusInternalServerError)
return
}
- if leader != g.raft.info.Address {
+ if leader == nil || leader.ID != g.raft.info.ID {
http.Error(w, "503 not leader", http.StatusServiceUnavailable)
return
}
@@ -474,12 +474,12 @@ func (g *Gateway) LeaderAddress() (string, error) {
// wait a bit until one is elected.
if g.server != nil {
for ctx.Err() == nil {
- leader, err := g.server.LeaderAddress(context.Background())
+ leader, err := g.server.Leader(context.Background())
if err != nil {
return "", errors.Wrap(err, "Failed to get leader address")
}
- if leader != "" {
- return leader, nil
+ if leader != nil {
+ return leader.Address, nil
}
time.Sleep(time.Second)
}
@@ -662,11 +662,11 @@ func (g *Gateway) isLeader() (bool, error) {
if g.server == nil {
return false, nil
}
- leader, err := g.server.LeaderAddress(context.Background())
+ leader, err := g.server.Leader(context.Background())
if err != nil {
return false, errors.Wrap(err, "Failed to get leader address")
}
- return leader == g.raft.info.Address, nil
+ return leader != nil && leader.ID == g.raft.info.ID, nil
}
// Internal error signalling that a node not the leader.
From 43431349888f98f36f2141e20ee2e3d8d8e81ea0 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 27 Aug 2019 16:24:44 +0200
Subject: [PATCH 04/15] Adapt lxd/db package to new dqlite driver import
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/db/cluster/open.go | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/lxd/db/cluster/open.go b/lxd/db/cluster/open.go
index e580e13de1..01870a1f0e 100644
--- a/lxd/db/cluster/open.go
+++ b/lxd/db/cluster/open.go
@@ -6,7 +6,7 @@ import (
"path/filepath"
"sync/atomic"
- dqlite "github.com/canonical/go-dqlite"
+ driver "github.com/canonical/go-dqlite/driver"
"github.com/lxc/lxd/lxd/db/query"
"github.com/lxc/lxd/lxd/db/schema"
"github.com/lxc/lxd/lxd/util"
@@ -23,8 +23,8 @@ import (
//
// The dialer argument is a function that returns a gRPC dialer that can be
// used to connect to a database node using the gRPC SQL package.
-func Open(name string, store dqlite.ServerStore, options ...dqlite.DriverOption) (*sql.DB, error) {
- driver, err := dqlite.NewDriver(store, options...)
+func Open(name string, store driver.ServerStore, options ...driver.Option) (*sql.DB, error) {
+ driver, err := driver.New(store, options...)
if err != nil {
return nil, errors.Wrap(err, "Failed to create dqlite driver")
}
From ed8fd6cff08fd04b1c5fff5634d98af6299b94cf Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 27 Aug 2019 16:25:56 +0200
Subject: [PATCH 05/15] Adapt lxd/db to new dqlite driver package
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/db/db.go | 8 ++++----
lxd/db/migration_test.go | 4 ++--
lxd/db/testing.go | 11 ++++++-----
3 files changed, 12 insertions(+), 11 deletions(-)
diff --git a/lxd/db/db.go b/lxd/db/db.go
index 341982d4d9..8166be4179 100644
--- a/lxd/db/db.go
+++ b/lxd/db/db.go
@@ -8,7 +8,7 @@ import (
"sync"
"time"
- dqlite "github.com/canonical/go-dqlite"
+ "github.com/canonical/go-dqlite/driver"
"github.com/pkg/errors"
"github.com/lxc/lxd/lxd/db/cluster"
@@ -163,7 +163,7 @@ type Cluster struct {
// database matches our version, and possibly trigger a schema update. If the
// schema update can't be performed right now, because some nodes are still
// behind, an Upgrading error is returned.
-func OpenCluster(name string, store dqlite.ServerStore, address, dir string, timeout time.Duration, dump *Dump, options ...dqlite.DriverOption) (*Cluster, error) {
+func OpenCluster(name string, store driver.ServerStore, address, dir string, timeout time.Duration, dump *Dump, options ...driver.Option) (*Cluster, error) {
db, err := cluster.Open(name, store, options...)
if err != nil {
return nil, errors.Wrap(err, "failed to open database")
@@ -194,7 +194,7 @@ func OpenCluster(name string, store dqlite.ServerStore, address, dir string, tim
}
cause := errors.Cause(err)
- if cause != dqlite.ErrNoAvailableLeader {
+ if cause != driver.ErrNoAvailableLeader {
return nil, err
}
@@ -315,7 +315,7 @@ func ForLocalInspectionWithPreparedStmts(db *sql.DB) (*Cluster, error) {
// SetDefaultTimeout sets the default go-dqlite driver timeout.
func (c *Cluster) SetDefaultTimeout(timeout time.Duration) {
- driver := c.db.Driver().(*dqlite.Driver)
+ driver := c.db.Driver().(*driver.Driver)
driver.SetContextTimeout(timeout)
}
diff --git a/lxd/db/migration_test.go b/lxd/db/migration_test.go
index 7e82e8b173..3726eff244 100644
--- a/lxd/db/migration_test.go
+++ b/lxd/db/migration_test.go
@@ -7,7 +7,7 @@ import (
"testing"
"time"
- dqlite "github.com/canonical/go-dqlite"
+ "github.com/canonical/go-dqlite/driver"
"github.com/lxc/lxd/lxd/db"
"github.com/lxc/lxd/lxd/db/query"
"github.com/stretchr/testify/assert"
@@ -53,7 +53,7 @@ func TestImportPreClusteringData(t *testing.T) {
cluster, err := db.OpenCluster(
"test.db", store, "1", dir, 5*time.Second, dump,
- dqlite.WithDialFunc(dial))
+ driver.WithDialFunc(dial))
require.NoError(t, err)
defer cluster.Close()
diff --git a/lxd/db/testing.go b/lxd/db/testing.go
index f2954393fb..69477252f2 100644
--- a/lxd/db/testing.go
+++ b/lxd/db/testing.go
@@ -11,6 +11,7 @@ import (
"time"
dqlite "github.com/canonical/go-dqlite"
+ "github.com/canonical/go-dqlite/driver"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@@ -65,7 +66,7 @@ func NewTestCluster(t *testing.T) (*Cluster, func()) {
cluster, err := OpenCluster(
"test.db", store, "1", dir, 5*time.Second, nil,
- dqlite.WithLogFunc(log), dqlite.WithDialFunc(dial))
+ driver.WithLogFunc(log), driver.WithDialFunc(dial))
require.NoError(t, err)
cleanup := func() {
@@ -100,7 +101,7 @@ func NewTestClusterTx(t *testing.T) (*ClusterTx, func()) {
//
// Return the directory backing the test server and a newly created server
// store that can be used to connect to it.
-func NewTestDqliteServer(t *testing.T) (string, *dqlite.DatabaseServerStore, func()) {
+func NewTestDqliteServer(t *testing.T) (string, driver.ServerStore, func()) {
t.Helper()
listener, err := net.Listen("unix", "")
@@ -113,7 +114,7 @@ func NewTestDqliteServer(t *testing.T) (string, *dqlite.DatabaseServerStore, fun
err = os.Mkdir(filepath.Join(dir, "global"), 0755)
require.NoError(t, err)
- info := dqlite.ServerInfo{ID: uint64(1), Address: address}
+ info := driver.ServerInfo{ID: uint64(1), Address: address}
server, err := dqlite.NewServer(
info, filepath.Join(dir, "global"), dqlite.WithServerBindAddress(address))
require.NoError(t, err)
@@ -126,10 +127,10 @@ func NewTestDqliteServer(t *testing.T) (string, *dqlite.DatabaseServerStore, fun
dirCleanup()
}
- store, err := dqlite.DefaultServerStore(":memory:")
+ store, err := driver.DefaultServerStore(":memory:")
require.NoError(t, err)
ctx := context.Background()
- require.NoError(t, store.Set(ctx, []dqlite.ServerInfo{{Address: address}}))
+ require.NoError(t, store.Set(ctx, []driver.ServerInfo{{Address: address}}))
return dir, store, cleanup
}
From 6db5def2a466f3bd2526ef6c7924abf95f22105f Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 27 Aug 2019 16:26:42 +0200
Subject: [PATCH 06/15] Adapt lxd/cluster to new dqlite sub-packages
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/cluster/gateway.go | 30 +++++++++++++++---------------
lxd/cluster/gateway_test.go | 10 +++++-----
lxd/cluster/heartbeat_test.go | 4 ++--
lxd/cluster/membership_test.go | 8 ++++----
lxd/cluster/migrate_test.go | 15 ++++++++-------
lxd/cluster/raft.go | 4 ++--
lxd/cluster/raft_test.go | 6 +++---
7 files changed, 39 insertions(+), 38 deletions(-)
diff --git a/lxd/cluster/gateway.go b/lxd/cluster/gateway.go
index f108baed58..8f759751f2 100644
--- a/lxd/cluster/gateway.go
+++ b/lxd/cluster/gateway.go
@@ -18,7 +18,7 @@ import (
"time"
dqlite "github.com/canonical/go-dqlite"
- dqliteclient "github.com/canonical/go-dqlite/client"
+ client "github.com/canonical/go-dqlite/client"
"github.com/lxc/lxd/lxd/db"
"github.com/lxc/lxd/lxd/util"
"github.com/lxc/lxd/shared"
@@ -89,7 +89,7 @@ type Gateway struct {
// but still we want to use dqlite as backend for the "cluster"
// database, to minimize the difference between code paths in
// clustering and non-clustering modes.
- memoryDial dqlite.DialFunc
+ memoryDial client.DialFunc
// Used when shutting down the daemon to cancel any ongoing gRPC
// dialing attempt.
@@ -323,7 +323,7 @@ func (g *Gateway) IsDatabaseNode() bool {
// DialFunc returns a dial function that can be used to connect to one of the
// dqlite nodes.
-func (g *Gateway) DialFunc() dqlite.DialFunc {
+func (g *Gateway) DialFunc() client.DialFunc {
return func(ctx context.Context, address string) (net.Conn, error) {
g.lock.RLock()
defer g.lock.RUnlock()
@@ -338,7 +338,7 @@ func (g *Gateway) DialFunc() dqlite.DialFunc {
}
// Dial function for establishing raft connections.
-func (g *Gateway) raftDial() dqlite.DialFunc {
+func (g *Gateway) raftDial() client.DialFunc {
return func(ctx context.Context, address string) (net.Conn, error) {
if address == "0" {
provider := raftAddressProvider{db: g.db}
@@ -363,7 +363,7 @@ func (g *Gateway) Context() context.Context {
// ServerStore returns a dqlite server store that can be used to lookup the
// addresses of known database nodes.
-func (g *Gateway) ServerStore() dqlite.ServerStore {
+func (g *Gateway) ServerStore() client.ServerStore {
return g.store
}
@@ -430,8 +430,8 @@ func (g *Gateway) Sync() {
}
}
-func (g *Gateway) getClient() (*dqliteclient.Client, error) {
- return dqliteclient.New(context.Background(), g.bindAddress)
+func (g *Gateway) getClient() (*client.Client, error) {
+ return client.New(context.Background(), g.bindAddress)
}
// Reset the gateway, shutting it down and starting against from scratch using
@@ -595,8 +595,8 @@ func (g *Gateway) init() error {
panic("unexpected server ID")
}
g.memoryDial = dqliteMemoryDial(g.bindAddress)
- g.store.inMemory = dqlite.NewInmemServerStore()
- g.store.Set(context.Background(), []dqlite.ServerInfo{raft.info})
+ g.store.inMemory = client.NewInmemServerStore()
+ g.store.Set(context.Background(), []client.ServerInfo{raft.info})
} else {
go runDqliteProxy(g.bindAddress, g.acceptCh)
g.store.inMemory = nil
@@ -630,7 +630,7 @@ func (g *Gateway) init() error {
}
g.lock.Lock()
- g.store.onDisk = dqlite.NewServerStore(g.db.DB(), "main", "raft_nodes", "address")
+ g.store.onDisk = client.NewServerStore(g.db.DB(), "main", "raft_nodes", "address")
g.lock.Unlock()
return nil
@@ -867,7 +867,7 @@ func dqliteNetworkDial(ctx context.Context, addr string, g *Gateway, checkLeader
}
// Create a dial function that connects to the local dqlite.
-func dqliteMemoryDial(bindAddress string) dqlite.DialFunc {
+func dqliteMemoryDial(bindAddress string) client.DialFunc {
return func(ctx context.Context, address string) (net.Conn, error) {
return net.Dial("unix", bindAddress)
}
@@ -924,18 +924,18 @@ func runDqliteProxy(bindAddress string, acceptCh chan net.Conn) {
// Conditionally uses the in-memory or the on-disk server store.
type dqliteServerStore struct {
- inMemory dqlite.ServerStore
- onDisk dqlite.ServerStore
+ inMemory client.ServerStore
+ onDisk client.ServerStore
}
-func (s *dqliteServerStore) Get(ctx context.Context) ([]dqlite.ServerInfo, error) {
+func (s *dqliteServerStore) Get(ctx context.Context) ([]client.ServerInfo, error) {
if s.inMemory != nil {
return s.inMemory.Get(ctx)
}
return s.onDisk.Get(ctx)
}
-func (s *dqliteServerStore) Set(ctx context.Context, servers []dqlite.ServerInfo) error {
+func (s *dqliteServerStore) Set(ctx context.Context, servers []client.ServerInfo) error {
if s.inMemory != nil {
return s.inMemory.Set(ctx, servers)
}
diff --git a/lxd/cluster/gateway_test.go b/lxd/cluster/gateway_test.go
index 7cc26541f6..fbaec68907 100644
--- a/lxd/cluster/gateway_test.go
+++ b/lxd/cluster/gateway_test.go
@@ -11,7 +11,7 @@ import (
"path/filepath"
"testing"
- dqlite "github.com/canonical/go-dqlite"
+ "github.com/canonical/go-dqlite/driver"
"github.com/lxc/lxd/lxd/cluster"
"github.com/lxc/lxd/lxd/db"
"github.com/lxc/lxd/shared"
@@ -56,9 +56,9 @@ func TestGateway_Single(t *testing.T) {
assert.Equal(t, "", leader)
assert.EqualError(t, err, "Node is not clustered")
- driver, err := dqlite.NewDriver(
+ driver, err := driver.New(
gateway.ServerStore(),
- dqlite.WithDialFunc(gateway.DialFunc()),
+ driver.WithDialFunc(gateway.DialFunc()),
)
require.NoError(t, err)
@@ -89,9 +89,9 @@ func TestGateway_SingleWithNetworkAddress(t *testing.T) {
mux.HandleFunc(path, handler)
}
- driver, err := dqlite.NewDriver(
+ driver, err := driver.New(
gateway.ServerStore(),
- dqlite.WithDialFunc(gateway.DialFunc()),
+ driver.WithDialFunc(gateway.DialFunc()),
)
require.NoError(t, err)
diff --git a/lxd/cluster/heartbeat_test.go b/lxd/cluster/heartbeat_test.go
index ee195f6b8e..a43c34cb45 100644
--- a/lxd/cluster/heartbeat_test.go
+++ b/lxd/cluster/heartbeat_test.go
@@ -7,7 +7,7 @@ import (
"testing"
"time"
- dqlite "github.com/canonical/go-dqlite"
+ "github.com/canonical/go-dqlite/driver"
"github.com/lxc/lxd/lxd/cluster"
"github.com/lxc/lxd/lxd/db"
"github.com/lxc/lxd/lxd/state"
@@ -263,7 +263,7 @@ func (f *heartbeatFixture) node() (*state.State, *cluster.Gateway, string) {
dial := gateway.DialFunc()
state.Cluster, err = db.OpenCluster(
"db.bin", store, address, "/unused/db/dir", 5*time.Second, nil,
- dqlite.WithDialFunc(dial))
+ driver.WithDialFunc(dial))
require.NoError(f.t, err)
f.gateways[len(f.gateways)] = gateway
diff --git a/lxd/cluster/membership_test.go b/lxd/cluster/membership_test.go
index 97e9c79e05..49d59f7dd7 100644
--- a/lxd/cluster/membership_test.go
+++ b/lxd/cluster/membership_test.go
@@ -8,7 +8,7 @@ import (
"testing"
"time"
- dqlite "github.com/canonical/go-dqlite"
+ "github.com/canonical/go-dqlite/driver"
"github.com/lxc/lxd/lxd/cluster"
"github.com/lxc/lxd/lxd/db"
"github.com/lxc/lxd/lxd/state"
@@ -261,7 +261,7 @@ func TestJoin(t *testing.T) {
targetState.Cluster, err = db.OpenCluster(
"db.bin", targetStore, targetAddress, "/unused/db/dir",
10*time.Second, nil,
- dqlite.WithDialFunc(targetDialFunc))
+ driver.WithDialFunc(targetDialFunc))
require.NoError(t, err)
targetF := &membershipFixtures{t: t, state: targetState}
@@ -298,7 +298,7 @@ func TestJoin(t *testing.T) {
state.Cluster, err = db.OpenCluster(
"db.bin", store, address, "/unused/db/dir", 5*time.Second, nil,
- dqlite.WithDialFunc(dialFunc))
+ driver.WithDialFunc(dialFunc))
require.NoError(t, err)
f := &membershipFixtures{t: t, state: state}
@@ -382,7 +382,7 @@ func FLAKY_TestPromote(t *testing.T) {
dialFunc := targetGateway.DialFunc()
targetState.Cluster, err = db.OpenCluster(
"db.bin", store, targetAddress, "/unused/db/dir", 5*time.Second, nil,
- dqlite.WithDialFunc(dialFunc))
+ driver.WithDialFunc(dialFunc))
require.NoError(t, err)
targetF := &membershipFixtures{t: t, state: targetState}
targetF.ClusterAddress(targetAddress)
diff --git a/lxd/cluster/migrate_test.go b/lxd/cluster/migrate_test.go
index 5702ce01d8..e8ce47d162 100644
--- a/lxd/cluster/migrate_test.go
+++ b/lxd/cluster/migrate_test.go
@@ -2,13 +2,14 @@ package cluster_test
import (
"context"
- "database/sql/driver"
+ sqldriver "database/sql/driver"
"io/ioutil"
"net"
"os"
"testing"
dqlite "github.com/canonical/go-dqlite"
+ "github.com/canonical/go-dqlite/driver"
"github.com/lxc/lxd/lxd/cluster"
"github.com/lxc/lxd/shared"
"github.com/stretchr/testify/assert"
@@ -24,7 +25,7 @@ func TestMigrateToDqlite10(t *testing.T) {
assert.NoError(t, err)
require.NoError(t, err)
- info := dqlite.ServerInfo{ID: uint64(1), Address: "1"}
+ info := driver.ServerInfo{ID: uint64(1), Address: "1"}
server, err := dqlite.NewServer(info, dir)
require.NoError(t, err)
defer server.Close()
@@ -32,27 +33,27 @@ func TestMigrateToDqlite10(t *testing.T) {
err = server.Start()
require.NoError(t, err)
- store, err := dqlite.DefaultServerStore(":memory:")
+ store, err := driver.DefaultServerStore(":memory:")
require.NoError(t, err)
- require.NoError(t, store.Set(context.Background(), []dqlite.ServerInfo{info}))
+ require.NoError(t, store.Set(context.Background(), []driver.ServerInfo{info}))
dial := func(ctx context.Context, address string) (net.Conn, error) {
return net.Dial("unix", "@dqlite-1")
}
- drv, err := dqlite.NewDriver(store, dqlite.WithDialFunc(dial))
+ drv, err := driver.New(store, driver.WithDialFunc(dial))
require.NoError(t, err)
conn, err := drv.Open("db.bin")
require.NoError(t, err)
defer conn.Close()
- queryer := conn.(driver.Queryer)
+ queryer := conn.(sqldriver.Queryer)
rows, err := queryer.Query("SELECT name FROM containers", nil)
require.NoError(t, err)
- values := make([]driver.Value, 1)
+ values := make([]sqldriver.Value, 1)
require.NoError(t, rows.Next(values))
assert.Equal(t, values[0], "c1")
diff --git a/lxd/cluster/raft.go b/lxd/cluster/raft.go
index 3abaa866cc..9d6fc408bb 100644
--- a/lxd/cluster/raft.go
+++ b/lxd/cluster/raft.go
@@ -5,7 +5,7 @@ import (
"os"
"path/filepath"
- dqlite "github.com/canonical/go-dqlite"
+ client "github.com/canonical/go-dqlite/client"
"github.com/lxc/lxd/lxd/db"
"github.com/lxc/lxd/lxd/node"
"github.com/lxc/lxd/shared"
@@ -65,7 +65,7 @@ func newRaft(database *db.Node, cert *shared.CertInfo, latency float64) (*raftIn
// A LXD-specific wrapper around raft.Raft, which also holds a reference to its
// network transport and dqlite FSM.
type raftInstance struct {
- info dqlite.ServerInfo
+ info client.ServerInfo
}
// Create a new raftFactory, instantiating all needed raft dependencies.
diff --git a/lxd/cluster/raft_test.go b/lxd/cluster/raft_test.go
index 622a9bf814..1a26f50d68 100644
--- a/lxd/cluster/raft_test.go
+++ b/lxd/cluster/raft_test.go
@@ -5,7 +5,7 @@ import (
"net/http/httptest"
"testing"
- dqlite "github.com/canonical/go-dqlite"
+ "github.com/canonical/go-dqlite/client"
"github.com/lxc/lxd/lxd/cluster"
"github.com/lxc/lxd/lxd/db"
"github.com/lxc/lxd/lxd/util"
@@ -26,7 +26,7 @@ func newRaft(t *testing.T, db *db.Node, cert *shared.CertInfo) *cluster.RaftInst
// address into the raft_nodes table.
//
// This effectively makes the node act as a database raft node.
-func setRaftRole(t *testing.T, database *db.Node, address string) *dqlite.DatabaseServerStore {
+func setRaftRole(t *testing.T, database *db.Node, address string) client.ServerStore {
require.NoError(t, database.Transaction(func(tx *db.NodeTx) error {
err := tx.UpdateConfig(map[string]string{"cluster.https_address": address})
if err != nil {
@@ -36,7 +36,7 @@ func setRaftRole(t *testing.T, database *db.Node, address string) *dqlite.Databa
return err
}))
- store := dqlite.NewServerStore(database.DB(), "main", "raft_nodes", "address")
+ store := client.NewServerStore(database.DB(), "main", "raft_nodes", "address")
return store
}
From 8fbace1872dd1c39b9ff9ab373692f8359722d4a Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 27 Aug 2019 16:28:32 +0200
Subject: [PATCH 07/15] Adapt main package to new dqlite sub packages
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/api_cluster.go | 10 +++++-----
lxd/daemon.go | 16 ++++++++--------
lxd/response.go | 6 +++---
3 files changed, 16 insertions(+), 16 deletions(-)
diff --git a/lxd/api_cluster.go b/lxd/api_cluster.go
index 7bd84533e7..ab5be0cc13 100644
--- a/lxd/api_cluster.go
+++ b/lxd/api_cluster.go
@@ -12,7 +12,7 @@ import (
"strings"
"time"
- "github.com/canonical/go-dqlite"
+ dqlitedriver "github.com/canonical/go-dqlite/driver"
"github.com/gorilla/mux"
"github.com/pkg/errors"
@@ -20,7 +20,7 @@ import (
"github.com/lxc/lxd/lxd/cluster"
"github.com/lxc/lxd/lxd/db"
"github.com/lxc/lxd/lxd/node"
- driver "github.com/lxc/lxd/lxd/storage"
+ storagedriver "github.com/lxc/lxd/lxd/storage"
"github.com/lxc/lxd/lxd/util"
"github.com/lxc/lxd/shared"
"github.com/lxc/lxd/shared/api"
@@ -495,7 +495,7 @@ func clusterPutJoin(d *Daemon, req api.ClusterPut) Response {
if err != nil {
return errors.Wrap(err, "failed to init ceph pool for joining node")
}
- volumeMntPoint := driver.GetStoragePoolVolumeMountPoint(
+ volumeMntPoint := storagedriver.GetStoragePoolVolumeMountPoint(
name, storage.(*storageCeph).volume.Name)
err = os.MkdirAll(volumeMntPoint, 0711)
if err != nil {
@@ -657,8 +657,8 @@ func clusterPutDisable(d *Daemon) Response {
"db.bin", store, address, "/unused/db/dir",
d.config.DqliteSetupTimeout,
nil,
- dqlite.WithDialFunc(d.gateway.DialFunc()),
- dqlite.WithContext(d.gateway.Context()),
+ dqlitedriver.WithDialFunc(d.gateway.DialFunc()),
+ dqlitedriver.WithContext(d.gateway.Context()),
)
if err != nil {
return SmartError(err)
diff --git a/lxd/daemon.go b/lxd/daemon.go
index 65afb070dd..b336ca69b3 100644
--- a/lxd/daemon.go
+++ b/lxd/daemon.go
@@ -5,7 +5,7 @@ import (
"context"
"crypto/x509"
"database/sql"
- "database/sql/driver"
+ sqldriver "database/sql/driver"
"fmt"
"io"
"net/http"
@@ -17,7 +17,7 @@ import (
"time"
"github.com/CanonicalLtd/candidclient"
- dqlite "github.com/canonical/go-dqlite"
+ "github.com/canonical/go-dqlite/driver"
"github.com/gorilla/mux"
"github.com/pkg/errors"
"golang.org/x/sys/unix"
@@ -689,11 +689,11 @@ func (d *Daemon) init() error {
d.cluster, err = db.OpenCluster(
"db.bin", store, clusterAddress, dir,
d.config.DqliteSetupTimeout, dump,
- dqlite.WithDialFunc(d.gateway.DialFunc()),
- dqlite.WithContext(d.gateway.Context()),
- dqlite.WithConnectionTimeout(10*time.Second),
- dqlite.WithContextTimeout(contextTimeout),
- dqlite.WithLogFunc(cluster.DqliteLog),
+ driver.WithDialFunc(d.gateway.DialFunc()),
+ driver.WithContext(d.gateway.Context()),
+ driver.WithConnectionTimeout(10*time.Second),
+ driver.WithContextTimeout(contextTimeout),
+ driver.WithLogFunc(cluster.DqliteLog),
)
if err == nil {
break
@@ -1041,7 +1041,7 @@ func (d *Daemon) Stop() error {
// If we got io.EOF the network connection was interrupted and
// it's likely that the other node shutdown. Let's just log the
// event and return cleanly.
- if errors.Cause(err) == driver.ErrBadConn {
+ if errors.Cause(err) == sqldriver.ErrBadConn {
logger.Debugf("Could not close remote database cleanly: %v", err)
} else {
trackError(err)
diff --git a/lxd/response.go b/lxd/response.go
index 3b9e6fe9c9..76b58b3b3c 100644
--- a/lxd/response.go
+++ b/lxd/response.go
@@ -11,8 +11,8 @@ import (
"os"
"time"
- dqlite "github.com/canonical/go-dqlite"
- "github.com/mattn/go-sqlite3"
+ "github.com/canonical/go-dqlite/driver"
+ sqlite3 "github.com/mattn/go-sqlite3"
"github.com/pkg/errors"
lxd "github.com/lxc/lxd/client"
@@ -541,7 +541,7 @@ func SmartError(err error) Response {
}
return Conflict(nil)
- case dqlite.ErrNoAvailableLeader:
+ case driver.ErrNoAvailableLeader:
return Unavailable(err)
default:
return InternalError(err)
From 772d76c7aba5451c2fe9f60efb508ae610de622b Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 27 Aug 2019 17:00:32 +0200
Subject: [PATCH 08/15] Adapt lxd/db/cluster to Server -> Node rename
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/db/cluster/open.go | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/lxd/db/cluster/open.go b/lxd/db/cluster/open.go
index 01870a1f0e..cffb2b50c6 100644
--- a/lxd/db/cluster/open.go
+++ b/lxd/db/cluster/open.go
@@ -23,7 +23,7 @@ import (
//
// The dialer argument is a function that returns a gRPC dialer that can be
// used to connect to a database node using the gRPC SQL package.
-func Open(name string, store driver.ServerStore, options ...driver.Option) (*sql.DB, error) {
+func Open(name string, store driver.NodeStore, options ...driver.Option) (*sql.DB, error) {
driver, err := driver.New(store, options...)
if err != nil {
return nil, errors.Wrap(err, "Failed to create dqlite driver")
From b28073664489ec8d50e66e298a26327f387b3f6d Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 27 Aug 2019 17:01:08 +0200
Subject: [PATCH 09/15] Adapt lxd/db to Server -> Node rename
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/db/db.go | 2 +-
lxd/db/testing.go | 17 +++++++++--------
2 files changed, 10 insertions(+), 9 deletions(-)
diff --git a/lxd/db/db.go b/lxd/db/db.go
index 8166be4179..feec9af821 100644
--- a/lxd/db/db.go
+++ b/lxd/db/db.go
@@ -163,7 +163,7 @@ type Cluster struct {
// database matches our version, and possibly trigger a schema update. If the
// schema update can't be performed right now, because some nodes are still
// behind, an Upgrading error is returned.
-func OpenCluster(name string, store driver.ServerStore, address, dir string, timeout time.Duration, dump *Dump, options ...driver.Option) (*Cluster, error) {
+func OpenCluster(name string, store driver.NodeStore, address, dir string, timeout time.Duration, dump *Dump, options ...driver.Option) (*Cluster, error) {
db, err := cluster.Open(name, store, options...)
if err != nil {
return nil, errors.Wrap(err, "failed to open database")
diff --git a/lxd/db/testing.go b/lxd/db/testing.go
index 69477252f2..8ed351af2f 100644
--- a/lxd/db/testing.go
+++ b/lxd/db/testing.go
@@ -11,6 +11,7 @@ import (
"time"
dqlite "github.com/canonical/go-dqlite"
+ "github.com/canonical/go-dqlite/client"
"github.com/canonical/go-dqlite/driver"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -101,7 +102,7 @@ func NewTestClusterTx(t *testing.T) (*ClusterTx, func()) {
//
// Return the directory backing the test server and a newly created server
// store that can be used to connect to it.
-func NewTestDqliteServer(t *testing.T) (string, driver.ServerStore, func()) {
+func NewTestDqliteServer(t *testing.T) (string, driver.NodeStore, func()) {
t.Helper()
listener, err := net.Listen("unix", "")
@@ -114,9 +115,9 @@ func NewTestDqliteServer(t *testing.T) (string, driver.ServerStore, func()) {
err = os.Mkdir(filepath.Join(dir, "global"), 0755)
require.NoError(t, err)
- info := driver.ServerInfo{ID: uint64(1), Address: address}
- server, err := dqlite.NewServer(
- info, filepath.Join(dir, "global"), dqlite.WithServerBindAddress(address))
+ info := driver.NodeInfo{ID: uint64(1), Address: address}
+ server, err := dqlite.New(
+ info, filepath.Join(dir, "global"), dqlite.WithBindAddress(address))
require.NoError(t, err)
err = server.Start()
@@ -127,10 +128,10 @@ func NewTestDqliteServer(t *testing.T) (string, driver.ServerStore, func()) {
dirCleanup()
}
- store, err := driver.DefaultServerStore(":memory:")
+ store, err := driver.DefaultNodeStore(":memory:")
require.NoError(t, err)
ctx := context.Background()
- require.NoError(t, store.Set(ctx, []driver.ServerInfo{{Address: address}}))
+ require.NoError(t, store.Set(ctx, []driver.NodeInfo{{Address: address}}))
return dir, store, cleanup
}
@@ -156,8 +157,8 @@ func newDir(t *testing.T) (string, func()) {
return dir, cleanup
}
-func newLogFunc(t *testing.T) dqlite.LogFunc {
- return func(l dqlite.LogLevel, format string, a ...interface{}) {
+func newLogFunc(t *testing.T) client.LogFunc {
+ return func(l client.LogLevel, format string, a ...interface{}) {
format = fmt.Sprintf("%s: %s", l.String(), format)
t.Logf(format, a...)
}
From b8dda7921c209c9512ad46dd05aa6b15412debf0 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 27 Aug 2019 17:01:35 +0200
Subject: [PATCH 10/15] Adapt lxd/cluster to Server -> Node rename
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/cluster/gateway.go | 47 +++++++++++++++++-----------------
lxd/cluster/gateway_test.go | 4 +--
lxd/cluster/heartbeat_test.go | 2 +-
lxd/cluster/membership.go | 6 ++---
lxd/cluster/membership_test.go | 6 ++---
lxd/cluster/migrate_test.go | 8 +++---
lxd/cluster/raft.go | 2 +-
lxd/cluster/raft_test.go | 4 +--
8 files changed, 39 insertions(+), 40 deletions(-)
diff --git a/lxd/cluster/gateway.go b/lxd/cluster/gateway.go
index 8f759751f2..3957d7d5ab 100644
--- a/lxd/cluster/gateway.go
+++ b/lxd/cluster/gateway.go
@@ -54,7 +54,7 @@ func NewGateway(db *db.Node, cert *shared.CertInfo, options ...Option) (*Gateway
cancel: cancel,
upgradeCh: make(chan struct{}, 16),
acceptCh: make(chan net.Conn),
- store: &dqliteServerStore{},
+ store: &dqliteNodeStore{},
}
err := gateway.init()
@@ -80,7 +80,7 @@ type Gateway struct {
// The gRPC server exposing the dqlite driver created by this
// gateway. It's nil if this LXD node is not supposed to be part of the
// raft cluster.
- server *dqlite.Server
+ server *dqlite.Node
acceptCh chan net.Conn
// A dialer that will connect to the dqlite server using a loopback
@@ -108,8 +108,8 @@ type Gateway struct {
Cluster *db.Cluster
HeartbeatNodeHook func(*APIHeartbeat)
- // ServerStore wrapper.
- store *dqliteServerStore
+ // NodeStore wrapper.
+ store *dqliteNodeStore
lock sync.RWMutex
@@ -361,9 +361,9 @@ func (g *Gateway) Context() context.Context {
return g.ctx
}
-// ServerStore returns a dqlite server store that can be used to lookup the
+// NodeStore returns a dqlite server store that can be used to lookup the
// addresses of known database nodes.
-func (g *Gateway) ServerStore() client.ServerStore {
+func (g *Gateway) NodeStore() client.NodeStore {
return g.store
}
@@ -585,9 +585,8 @@ func (g *Gateway) init() error {
g.bindAddress = listener.Addr().String()
listener.Close()
- options := []dqlite.ServerOption{
- dqlite.WithServerLogFunc(DqliteLog),
- dqlite.WithServerBindAddress(g.bindAddress),
+ options := []dqlite.Option{
+ dqlite.WithBindAddress(g.bindAddress),
}
if raft.info.Address == "1" {
@@ -595,15 +594,15 @@ func (g *Gateway) init() error {
panic("unexpected server ID")
}
g.memoryDial = dqliteMemoryDial(g.bindAddress)
- g.store.inMemory = client.NewInmemServerStore()
- g.store.Set(context.Background(), []client.ServerInfo{raft.info})
+ g.store.inMemory = client.NewInmemNodeStore()
+ g.store.Set(context.Background(), []client.NodeInfo{raft.info})
} else {
go runDqliteProxy(g.bindAddress, g.acceptCh)
g.store.inMemory = nil
- options = append(options, dqlite.WithServerDialFunc(g.raftDial()))
+ options = append(options, dqlite.WithDialFunc(g.raftDial()))
}
- server, err := dqlite.NewServer(
+ server, err := dqlite.New(
raft.info,
dir,
options...,
@@ -630,7 +629,7 @@ func (g *Gateway) init() error {
}
g.lock.Lock()
- g.store.onDisk = client.NewServerStore(g.db.DB(), "main", "raft_nodes", "address")
+ g.store.onDisk = client.NewNodeStore(g.db.DB(), "main", "raft_nodes", "address")
g.lock.Unlock()
return nil
@@ -878,16 +877,16 @@ func dqliteMemoryDial(bindAddress string) client.DialFunc {
const databaseEndpoint = "/internal/database"
// DqliteLog redirects dqlite's logs to our own logger
-func DqliteLog(l dqlite.LogLevel, format string, a ...interface{}) {
+func DqliteLog(l client.LogLevel, format string, a ...interface{}) {
format = fmt.Sprintf("Dqlite: %s", format)
switch l {
- case dqlite.LogDebug:
+ case client.LogDebug:
logger.Debugf(format, a...)
- case dqlite.LogInfo:
+ case client.LogInfo:
logger.Debugf(format, a...)
- case dqlite.LogWarn:
+ case client.LogWarn:
logger.Warnf(format, a...)
- case dqlite.LogError:
+ case client.LogError:
logger.Errorf(format, a...)
}
}
@@ -923,19 +922,19 @@ func runDqliteProxy(bindAddress string, acceptCh chan net.Conn) {
}
// Conditionally uses the in-memory or the on-disk server store.
-type dqliteServerStore struct {
- inMemory client.ServerStore
- onDisk client.ServerStore
+type dqliteNodeStore struct {
+ inMemory client.NodeStore
+ onDisk client.NodeStore
}
-func (s *dqliteServerStore) Get(ctx context.Context) ([]client.ServerInfo, error) {
+func (s *dqliteNodeStore) Get(ctx context.Context) ([]client.NodeInfo, error) {
if s.inMemory != nil {
return s.inMemory.Get(ctx)
}
return s.onDisk.Get(ctx)
}
-func (s *dqliteServerStore) Set(ctx context.Context, servers []client.ServerInfo) error {
+func (s *dqliteNodeStore) Set(ctx context.Context, servers []client.NodeInfo) error {
if s.inMemory != nil {
return s.inMemory.Set(ctx, servers)
}
diff --git a/lxd/cluster/gateway_test.go b/lxd/cluster/gateway_test.go
index fbaec68907..925eee233b 100644
--- a/lxd/cluster/gateway_test.go
+++ b/lxd/cluster/gateway_test.go
@@ -57,7 +57,7 @@ func TestGateway_Single(t *testing.T) {
assert.EqualError(t, err, "Node is not clustered")
driver, err := driver.New(
- gateway.ServerStore(),
+ gateway.NodeStore(),
driver.WithDialFunc(gateway.DialFunc()),
)
require.NoError(t, err)
@@ -90,7 +90,7 @@ func TestGateway_SingleWithNetworkAddress(t *testing.T) {
}
driver, err := driver.New(
- gateway.ServerStore(),
+ gateway.NodeStore(),
driver.WithDialFunc(gateway.DialFunc()),
)
require.NoError(t, err)
diff --git a/lxd/cluster/heartbeat_test.go b/lxd/cluster/heartbeat_test.go
index a43c34cb45..4b4e247dc0 100644
--- a/lxd/cluster/heartbeat_test.go
+++ b/lxd/cluster/heartbeat_test.go
@@ -259,7 +259,7 @@ func (f *heartbeatFixture) node() (*state.State, *cluster.Gateway, string) {
var err error
require.NoError(f.t, state.Cluster.Close())
- store := gateway.ServerStore()
+ store := gateway.NodeStore()
dial := gateway.DialFunc()
state.Cluster, err = db.OpenCluster(
"db.bin", store, address, "/unused/db/dir", 5*time.Second, nil,
diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go
index 56f65498f2..6e2fbe5068 100644
--- a/lxd/cluster/membership.go
+++ b/lxd/cluster/membership.go
@@ -329,7 +329,7 @@ func Join(state *state.State, gateway *Gateway, cert *shared.CertInfo, name stri
log15.Ctx{"id": id, "address": address, "target": target.Address})
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
- err := gateway.server.Join(ctx, gateway.ServerStore(), gateway.raftDial())
+ err := gateway.server.Join(ctx, gateway.NodeStore(), gateway.raftDial())
if err != nil {
return err
}
@@ -610,7 +610,7 @@ func Promote(state *state.State, gateway *Gateway, nodes []db.RaftNode) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
- err = gateway.server.Join(ctx, gateway.ServerStore(), gateway.raftDial())
+ err = gateway.server.Join(ctx, gateway.NodeStore(), gateway.raftDial())
if err != nil {
return err
}
@@ -698,7 +698,7 @@ func Leave(state *state.State, gateway *Gateway, name string, force bool) (strin
log15.Ctx{"id": id, "address": address, "target": target})
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
- err = dqlite.Leave(ctx, uint64(id), gateway.ServerStore(), gateway.raftDial())
+ err = dqlite.Leave(ctx, uint64(id), gateway.NodeStore(), gateway.raftDial())
if err != nil {
return "", err
}
diff --git a/lxd/cluster/membership_test.go b/lxd/cluster/membership_test.go
index 49d59f7dd7..01137857d9 100644
--- a/lxd/cluster/membership_test.go
+++ b/lxd/cluster/membership_test.go
@@ -254,7 +254,7 @@ func TestJoin(t *testing.T) {
require.NoError(t, targetState.Cluster.Close())
- targetStore := targetGateway.ServerStore()
+ targetStore := targetGateway.NodeStore()
targetDialFunc := targetGateway.DialFunc()
var err error
@@ -293,7 +293,7 @@ func TestJoin(t *testing.T) {
require.NoError(t, state.Cluster.Close())
- store := gateway.ServerStore()
+ store := gateway.NodeStore()
dialFunc := gateway.DialFunc()
state.Cluster, err = db.OpenCluster(
@@ -378,7 +378,7 @@ func FLAKY_TestPromote(t *testing.T) {
targetAddress := targetServer.Listener.Addr().String()
var err error
require.NoError(t, targetState.Cluster.Close())
- store := targetGateway.ServerStore()
+ store := targetGateway.NodeStore()
dialFunc := targetGateway.DialFunc()
targetState.Cluster, err = db.OpenCluster(
"db.bin", store, targetAddress, "/unused/db/dir", 5*time.Second, nil,
diff --git a/lxd/cluster/migrate_test.go b/lxd/cluster/migrate_test.go
index e8ce47d162..e17d96571c 100644
--- a/lxd/cluster/migrate_test.go
+++ b/lxd/cluster/migrate_test.go
@@ -25,18 +25,18 @@ func TestMigrateToDqlite10(t *testing.T) {
assert.NoError(t, err)
require.NoError(t, err)
- info := driver.ServerInfo{ID: uint64(1), Address: "1"}
- server, err := dqlite.NewServer(info, dir)
+ info := driver.NodeInfo{ID: uint64(1), Address: "1"}
+ server, err := dqlite.New(info, dir)
require.NoError(t, err)
defer server.Close()
err = server.Start()
require.NoError(t, err)
- store, err := driver.DefaultServerStore(":memory:")
+ store, err := driver.DefaultNodeStore(":memory:")
require.NoError(t, err)
- require.NoError(t, store.Set(context.Background(), []driver.ServerInfo{info}))
+ require.NoError(t, store.Set(context.Background(), []driver.NodeInfo{info}))
dial := func(ctx context.Context, address string) (net.Conn, error) {
return net.Dial("unix", "@dqlite-1")
diff --git a/lxd/cluster/raft.go b/lxd/cluster/raft.go
index 9d6fc408bb..5db2c695c8 100644
--- a/lxd/cluster/raft.go
+++ b/lxd/cluster/raft.go
@@ -65,7 +65,7 @@ func newRaft(database *db.Node, cert *shared.CertInfo, latency float64) (*raftIn
// A LXD-specific wrapper around raft.Raft, which also holds a reference to its
// network transport and dqlite FSM.
type raftInstance struct {
- info client.ServerInfo
+ info client.NodeInfo
}
// Create a new raftFactory, instantiating all needed raft dependencies.
diff --git a/lxd/cluster/raft_test.go b/lxd/cluster/raft_test.go
index 1a26f50d68..563f43abff 100644
--- a/lxd/cluster/raft_test.go
+++ b/lxd/cluster/raft_test.go
@@ -26,7 +26,7 @@ func newRaft(t *testing.T, db *db.Node, cert *shared.CertInfo) *cluster.RaftInst
// address into the raft_nodes table.
//
// This effectively makes the node act as a database raft node.
-func setRaftRole(t *testing.T, database *db.Node, address string) client.ServerStore {
+func setRaftRole(t *testing.T, database *db.Node, address string) client.NodeStore {
require.NoError(t, database.Transaction(func(tx *db.NodeTx) error {
err := tx.UpdateConfig(map[string]string{"cluster.https_address": address})
if err != nil {
@@ -36,7 +36,7 @@ func setRaftRole(t *testing.T, database *db.Node, address string) client.ServerS
return err
}))
- store := client.NewServerStore(database.DB(), "main", "raft_nodes", "address")
+ store := client.NewNodeStore(database.DB(), "main", "raft_nodes", "address")
return store
}
From ff3cd5bd9591c497a44502697258f91ddc297912 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 27 Aug 2019 17:01:48 +0200
Subject: [PATCH 11/15] Adapt main package to Server -> Node rename
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/api_cluster.go | 2 +-
lxd/daemon.go | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
diff --git a/lxd/api_cluster.go b/lxd/api_cluster.go
index ab5be0cc13..0caabc658c 100644
--- a/lxd/api_cluster.go
+++ b/lxd/api_cluster.go
@@ -652,7 +652,7 @@ func clusterPutDisable(d *Daemon) Response {
if err != nil {
return SmartError(err)
}
- store := d.gateway.ServerStore()
+ store := d.gateway.NodeStore()
d.cluster, err = db.OpenCluster(
"db.bin", store, address, "/unused/db/dir",
d.config.DqliteSetupTimeout,
diff --git a/lxd/daemon.go b/lxd/daemon.go
index b336ca69b3..61170d95eb 100644
--- a/lxd/daemon.go
+++ b/lxd/daemon.go
@@ -676,7 +676,7 @@ func (d *Daemon) init() error {
logger.Info("Initializing global database")
dir := filepath.Join(d.os.VarDir, "database")
- store := d.gateway.ServerStore()
+ store := d.gateway.NodeStore()
contextTimeout := 5 * time.Second
if !clustered {
From 0eeab3c76668335d27123f2742c5620ec344078b Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 27 Aug 2019 17:06:51 +0200
Subject: [PATCH 12/15] Use Client.Cluster() API
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/cluster/gateway.go | 9 ++++++++-
1 file changed, 8 insertions(+), 1 deletion(-)
diff --git a/lxd/cluster/gateway.go b/lxd/cluster/gateway.go
index 3957d7d5ab..379893488c 100644
--- a/lxd/cluster/gateway.go
+++ b/lxd/cluster/gateway.go
@@ -411,6 +411,7 @@ func (g *Gateway) Sync() {
logger.Warnf("Failed to get client: %v", err)
return
}
+ defer client.Close()
files, err := client.Dump(context.Background(), "db.bin")
if err != nil {
@@ -689,7 +690,13 @@ func (g *Gateway) currentRaftNodes() ([]db.RaftNode, error) {
if !isLeader {
return nil, errNotLeader
}
- servers, err := g.server.Cluster(context.Background())
+ client, err := g.getClient()
+ if err != nil {
+ return nil, err
+ }
+ defer client.Close()
+
+ servers, err := client.Cluster(context.Background())
if err != nil {
return nil, err
}
From 2c27e835a8f9a6b976a2375687276700a0ea472a Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 27 Aug 2019 17:11:30 +0200
Subject: [PATCH 13/15] Use Client.Leader() API
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/cluster/gateway.go | 20 +++++++++++++++++---
1 file changed, 17 insertions(+), 3 deletions(-)
diff --git a/lxd/cluster/gateway.go b/lxd/cluster/gateway.go
index 379893488c..13ce49aad8 100644
--- a/lxd/cluster/gateway.go
+++ b/lxd/cluster/gateway.go
@@ -240,7 +240,13 @@ func (g *Gateway) HandlerFuncs(nodeRefreshTask func(*APIHeartbeat)) map[string]h
// probes the node to see if it's currently the leader
// (otherwise it tries with another node or retry later).
if r.Method == "HEAD" {
- leader, err := g.server.Leader(context.Background())
+ client, err := g.getClient()
+ if err != nil {
+ http.Error(w, "500 failed to get dqlite client", http.StatusInternalServerError)
+ return
+ }
+ defer client.Close()
+ leader, err := client.Leader(context.Background())
if err != nil {
http.Error(w, "500 failed to get leader address", http.StatusInternalServerError)
return
@@ -475,7 +481,11 @@ func (g *Gateway) LeaderAddress() (string, error) {
// wait a bit until one is elected.
if g.server != nil {
for ctx.Err() == nil {
- leader, err := g.server.Leader(context.Background())
+ client, err := g.getClient()
+ if err != nil {
+ return "", errors.Wrap(err, "Failed to get dqlite client")
+ }
+ leader, err := client.Leader(context.Background())
if err != nil {
return "", errors.Wrap(err, "Failed to get leader address")
}
@@ -662,7 +672,11 @@ func (g *Gateway) isLeader() (bool, error) {
if g.server == nil {
return false, nil
}
- leader, err := g.server.Leader(context.Background())
+ client, err := g.getClient()
+ if err != nil {
+ return false, errors.Wrap(err, "Failed to get dqlite client")
+ }
+ leader, err := client.Leader(context.Background())
if err != nil {
return false, errors.Wrap(err, "Failed to get leader address")
}
From be04719dffe67545c90ba225675e1d004fb2a32e Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 27 Aug 2019 17:41:40 +0200
Subject: [PATCH 14/15] Use Client.Add() API
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/cluster/membership.go | 24 ++++++++++++++++++++----
1 file changed, 20 insertions(+), 4 deletions(-)
diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go
index 6e2fbe5068..1419cdfdb1 100644
--- a/lxd/cluster/membership.go
+++ b/lxd/cluster/membership.go
@@ -9,6 +9,7 @@ import (
"time"
dqlite "github.com/canonical/go-dqlite"
+ "github.com/canonical/go-dqlite/client"
"github.com/lxc/lxd/lxd/db"
"github.com/lxc/lxd/lxd/db/cluster"
"github.com/lxc/lxd/lxd/node"
@@ -329,9 +330,18 @@ func Join(state *state.State, gateway *Gateway, cert *shared.CertInfo, name stri
log15.Ctx{"id": id, "address": address, "target": target.Address})
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
- err := gateway.server.Join(ctx, gateway.NodeStore(), gateway.raftDial())
+ client, err := client.FindLeader(
+ ctx, gateway.NodeStore(),
+ client.WithDialFunc(gateway.raftDial()),
+ client.WithLogFunc(DqliteLog),
+ )
if err != nil {
- return err
+ return errors.Wrap(err, "Failed to connect to cluster leader")
+ }
+ defer client.Close()
+ err = client.Add(ctx, gateway.raft.info)
+ if err != nil {
+ return errors.Wrap(err, "Failed to join cluster")
}
} else {
logger.Info("Joining cluster as non-database node")
@@ -610,9 +620,15 @@ func Promote(state *state.State, gateway *Gateway, nodes []db.RaftNode) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
- err = gateway.server.Join(ctx, gateway.NodeStore(), gateway.raftDial())
+
+ client, err := client.FindLeader(ctx, gateway.NodeStore(), client.WithDialFunc(gateway.raftDial()))
if err != nil {
- return err
+ return errors.Wrap(err, "Failed to connect to cluster leader")
+ }
+ defer client.Close()
+ err = client.Add(ctx, gateway.raft.info)
+ if err != nil {
+ return errors.Wrap(err, "Failed to join cluster")
}
// Unlock regular access to our cluster database, and make sure our
From a42f3341f63bba078813017161dd24c20d46a2f8 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 27 Aug 2019 17:47:42 +0200
Subject: [PATCH 15/15] Use Client.Remove() API
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/cluster/membership.go | 16 +++++++++++++---
1 file changed, 13 insertions(+), 3 deletions(-)
diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go
index 1419cdfdb1..98744d8311 100644
--- a/lxd/cluster/membership.go
+++ b/lxd/cluster/membership.go
@@ -8,7 +8,6 @@ import (
"strconv"
"time"
- dqlite "github.com/canonical/go-dqlite"
"github.com/canonical/go-dqlite/client"
"github.com/lxc/lxd/lxd/db"
"github.com/lxc/lxd/lxd/db/cluster"
@@ -714,10 +713,21 @@ func Leave(state *state.State, gateway *Gateway, name string, force bool) (strin
log15.Ctx{"id": id, "address": address, "target": target})
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
- err = dqlite.Leave(ctx, uint64(id), gateway.NodeStore(), gateway.raftDial())
+
+ client, err := client.FindLeader(
+ ctx, gateway.NodeStore(),
+ client.WithDialFunc(gateway.raftDial()),
+ client.WithLogFunc(DqliteLog),
+ )
if err != nil {
- return "", err
+ return "", errors.Wrap(err, "Failed to connect to cluster leader")
+ }
+ defer client.Close()
+ err = client.Remove(ctx, id)
+ if err != nil {
+ return "", errors.Wrap(err, "Failed to leave the cluster")
}
+
return address, nil
}
More information about the lxc-devel
mailing list