[lxc-devel] [lxd/master] Move events to a separate package
monstermunchkin on Github
lxc-bot at linuxcontainers.org
Wed Sep 25 07:46:52 UTC 2019
A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 301 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20190925/1aeb21b2/attachment-0001.bin>
-------------- next part --------------
From 5662fd6289f09760d1bc500023ebd9b1cab95c9a Mon Sep 17 00:00:00 2001
From: Thomas Hipp <thomas.hipp at canonical.com>
Date: Wed, 25 Sep 2019 09:31:45 +0200
Subject: [PATCH 1/2] lxd: Move events to new events package
This moves part of the event handling into a separate event package.
Signed-off-by: Thomas Hipp <thomas.hipp at canonical.com>
---
lxd/container.go | 3 +-
lxd/container_lxc.go | 27 ++---
lxd/daemon.go | 3 +-
lxd/devlxd.go | 41 +++----
lxd/events.go | 191 +------------------------------
lxd/events/events.go | 259 +++++++++++++++++++++++++++++++++++++++++++
lxd/main.go | 7 +-
lxd/main_forkdns.go | 3 +-
lxd/operations.go | 21 ++--
9 files changed, 317 insertions(+), 238 deletions(-)
create mode 100644 lxd/events/events.go
diff --git a/lxd/container.go b/lxd/container.go
index 45c7687dba..722db5fcef 100644
--- a/lxd/container.go
+++ b/lxd/container.go
@@ -20,6 +20,7 @@ import (
"github.com/lxc/lxd/lxd/db"
"github.com/lxc/lxd/lxd/device"
"github.com/lxc/lxd/lxd/device/config"
+ "github.com/lxc/lxd/lxd/events"
"github.com/lxc/lxd/lxd/instance"
"github.com/lxc/lxd/lxd/state"
"github.com/lxc/lxd/lxd/sys"
@@ -681,7 +682,7 @@ func containerCreateAsSnapshot(s *state.State, args db.ContainerArgs, sourceInst
os.RemoveAll(sourceInstance.StatePath())
}
- eventSendLifecycle(sourceInstance.Project(), "container-snapshot-created",
+ events.SendLifecycle(sourceInstance.Project(), "container-snapshot-created",
fmt.Sprintf("/1.0/containers/%s", sourceInstance.Name()),
map[string]interface{}{
"snapshot_name": args.Name,
diff --git a/lxd/container_lxc.go b/lxd/container_lxc.go
index 67e2e5f2d6..c75d4774da 100644
--- a/lxd/container_lxc.go
+++ b/lxd/container_lxc.go
@@ -30,6 +30,7 @@ import (
"github.com/lxc/lxd/lxd/db/query"
"github.com/lxc/lxd/lxd/device"
"github.com/lxc/lxd/lxd/device/config"
+ "github.com/lxc/lxd/lxd/events"
"github.com/lxc/lxd/lxd/instance"
"github.com/lxc/lxd/lxd/maas"
"github.com/lxc/lxd/lxd/project"
@@ -517,7 +518,7 @@ func containerLXCCreate(s *state.State, args db.ContainerArgs) (container, error
}
logger.Info("Created container", ctxMap)
- eventSendLifecycle(c.project, "container-created",
+ events.SendLifecycle(c.project, "container-created",
fmt.Sprintf("/1.0/containers/%s", c.name), nil)
return c, nil
@@ -2688,7 +2689,7 @@ func (c *containerLXC) Start(stateful bool) error {
}
logger.Info("Started container", ctxMap)
- eventSendLifecycle(c.project, "container-started",
+ events.SendLifecycle(c.project, "container-started",
fmt.Sprintf("/1.0/containers/%s", c.name), nil)
return nil
@@ -2856,7 +2857,7 @@ func (c *containerLXC) Stop(stateful bool) error {
op.Done(nil)
logger.Info("Stopped container", ctxMap)
- eventSendLifecycle(c.project, "container-stopped",
+ events.SendLifecycle(c.project, "container-stopped",
fmt.Sprintf("/1.0/containers/%s", c.name), nil)
return nil
} else if shared.PathExists(c.StatePath()) {
@@ -2912,7 +2913,7 @@ func (c *containerLXC) Stop(stateful bool) error {
}
logger.Info("Stopped container", ctxMap)
- eventSendLifecycle(c.project, "container-stopped",
+ events.SendLifecycle(c.project, "container-stopped",
fmt.Sprintf("/1.0/containers/%s", c.name), nil)
return nil
@@ -2973,7 +2974,7 @@ func (c *containerLXC) Shutdown(timeout time.Duration) error {
}
logger.Info("Shut down container", ctxMap)
- eventSendLifecycle(c.project, "container-shutdown",
+ events.SendLifecycle(c.project, "container-shutdown",
fmt.Sprintf("/1.0/containers/%s", c.name), nil)
return nil
@@ -3166,7 +3167,7 @@ func (c *containerLXC) Freeze() error {
}
logger.Info("Froze container", ctxMap)
- eventSendLifecycle(c.project, "container-paused",
+ events.SendLifecycle(c.project, "container-paused",
fmt.Sprintf("/1.0/containers/%s", c.name), nil)
return err
@@ -3211,7 +3212,7 @@ func (c *containerLXC) Unfreeze() error {
}
logger.Info("Unfroze container", ctxMap)
- eventSendLifecycle(c.project, "container-resumed",
+ events.SendLifecycle(c.project, "container-resumed",
fmt.Sprintf("/1.0/containers/%s", c.name), nil)
return err
@@ -3605,7 +3606,7 @@ func (c *containerLXC) Restore(sourceContainer Instance, stateful bool) error {
return nil
}
- eventSendLifecycle(c.project, "container-snapshot-restored",
+ events.SendLifecycle(c.project, "container-snapshot-restored",
fmt.Sprintf("/1.0/containers/%s", c.name), map[string]interface{}{
"snapshot_name": c.name,
})
@@ -3762,12 +3763,12 @@ func (c *containerLXC) Delete() error {
logger.Info("Deleted container", ctxMap)
if c.IsSnapshot() {
- eventSendLifecycle(c.project, "container-snapshot-deleted",
+ events.SendLifecycle(c.project, "container-snapshot-deleted",
fmt.Sprintf("/1.0/containers/%s", c.name), map[string]interface{}{
"snapshot_name": c.name,
})
} else {
- eventSendLifecycle(c.project, "container-deleted",
+ events.SendLifecycle(c.project, "container-deleted",
fmt.Sprintf("/1.0/containers/%s", c.name), nil)
}
@@ -3928,13 +3929,13 @@ func (c *containerLXC) Rename(newName string) error {
logger.Info("Renamed container", ctxMap)
if c.IsSnapshot() {
- eventSendLifecycle(c.project, "container-snapshot-renamed",
+ events.SendLifecycle(c.project, "container-snapshot-renamed",
fmt.Sprintf("/1.0/containers/%s", oldName), map[string]interface{}{
"new_name": newName,
"snapshot_name": oldName,
})
} else {
- eventSendLifecycle(c.project, "container-renamed",
+ events.SendLifecycle(c.project, "container-renamed",
fmt.Sprintf("/1.0/containers/%s", oldName), map[string]interface{}{
"new_name": newName,
})
@@ -4832,7 +4833,7 @@ func (c *containerLXC) Update(args db.ContainerArgs, userRequested bool) error {
endpoint = fmt.Sprintf("/1.0/containers/%s", c.name)
}
- eventSendLifecycle(c.project, "container-updated", endpoint, nil)
+ events.SendLifecycle(c.project, "container-updated", endpoint, nil)
return nil
}
diff --git a/lxd/daemon.go b/lxd/daemon.go
index d9a0f1d3a7..2d2d433ee5 100644
--- a/lxd/daemon.go
+++ b/lxd/daemon.go
@@ -32,6 +32,7 @@ import (
"github.com/lxc/lxd/lxd/db"
"github.com/lxc/lxd/lxd/device"
"github.com/lxc/lxd/lxd/endpoints"
+ "github.com/lxc/lxd/lxd/events"
"github.com/lxc/lxd/lxd/maas"
"github.com/lxc/lxd/lxd/node"
"github.com/lxc/lxd/lxd/rbac"
@@ -919,7 +920,7 @@ func (d *Daemon) startClusterTasks() {
d.clusterTasks.Add(cluster.HeartbeatTask(d.gateway))
// Events
- d.clusterTasks.Add(cluster.Events(d.endpoints, d.cluster, eventForward))
+ d.clusterTasks.Add(cluster.Events(d.endpoints, d.cluster, events.Forward))
// Auto-sync images across the cluster (daily)
d.clusterTasks.Add(autoSyncImagesTask(d))
diff --git a/lxd/devlxd.go b/lxd/devlxd.go
index 7ceeb96685..60bbdae845 100644
--- a/lxd/devlxd.go
+++ b/lxd/devlxd.go
@@ -17,8 +17,8 @@ import (
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
- "github.com/pborman/uuid"
+ "github.com/lxc/lxd/lxd/events"
"github.com/lxc/lxd/lxd/instance"
"github.com/lxc/lxd/lxd/util"
"github.com/lxc/lxd/shared"
@@ -104,7 +104,7 @@ var devlxdMetadataGet = devLxdHandler{"/1.0/meta-data", func(d *Daemon, c contai
}}
var devlxdEventsLock sync.Mutex
-var devlxdEventListeners map[int]map[string]*eventListener = make(map[int]map[string]*eventListener)
+var devlxdEventListeners map[int]map[string]*events.Listener = make(map[int]map[string]*events.Listener)
var devlxdEventsGet = devLxdHandler{"/1.0/events", func(d *Daemon, c container, w http.ResponseWriter, r *http.Request) *devLxdResponse {
typeStr := r.FormValue("type")
@@ -117,26 +117,20 @@ var devlxdEventsGet = devLxdHandler{"/1.0/events", func(d *Daemon, c container,
return &devLxdResponse{"internal server error", http.StatusInternalServerError, "raw"}
}
- listener := eventListener{
- project: c.Project(),
- active: make(chan bool, 1),
- connection: conn,
- id: uuid.NewRandom().String(),
- messageTypes: strings.Split(typeStr, ","),
- }
+ listener := events.NewEventListener(c.Project(), conn, strings.Split(typeStr, ","), "", false)
devlxdEventsLock.Lock()
cid := c.Id()
_, ok := devlxdEventListeners[cid]
if !ok {
- devlxdEventListeners[cid] = map[string]*eventListener{}
+ devlxdEventListeners[cid] = map[string]*events.Listener{}
}
- devlxdEventListeners[cid][listener.id] = &listener
+ devlxdEventListeners[cid][listener.ID()] = listener
devlxdEventsLock.Unlock()
- logger.Debugf("New container event listener for '%s': %s", c.Name(), listener.id)
+ logger.Debugf("New container event listener for '%s': %s", c.Name(), listener.ID())
- <-listener.active
+ listener.Wait()
return &devLxdResponse{"websocket", http.StatusOK, "websocket"}
}}
@@ -161,37 +155,36 @@ func devlxdEventSend(c container, eventType string, eventMessage interface{}) er
}
for _, listener := range listeners {
- if !shared.StringInSlice(eventType, listener.messageTypes) {
+ if !shared.StringInSlice(eventType, listener.MessageTypes()) {
continue
}
- go func(listener *eventListener, body []byte) {
+ go func(listener *events.Listener, body []byte) {
// Check that the listener still exists
if listener == nil {
return
}
// Ensure there is only a single even going out at the time
- listener.lock.Lock()
- defer listener.lock.Unlock()
+ listener.Lock()
+ defer listener.Unlock()
// Make sure we're not done already
- if listener.done {
+ if listener.IsDone() {
return
}
- err = listener.connection.WriteMessage(websocket.TextMessage, body)
+ err = listener.Connection().WriteMessage(websocket.TextMessage, body)
if err != nil {
// Remove the listener from the list
devlxdEventsLock.Lock()
- delete(devlxdEventListeners[cid], listener.id)
+ delete(devlxdEventListeners[cid], listener.ID())
devlxdEventsLock.Unlock()
// Disconnect the listener
- listener.connection.Close()
- listener.active <- false
- listener.done = true
- logger.Debugf("Disconnected container event listener for '%s': %s", c.Name(), listener.id)
+ listener.Connection().Close()
+ listener.Deactivate()
+ logger.Debugf("Disconnected container event listener for '%s': %s", c.Name(), listener.ID())
}
}(listener, body)
}
diff --git a/lxd/events.go b/lxd/events.go
index 8040d20a4a..471e56c1c4 100644
--- a/lxd/events.go
+++ b/lxd/events.go
@@ -1,20 +1,12 @@
package main
import (
- "encoding/json"
- "fmt"
"net/http"
"strings"
- "sync"
- "time"
-
- "github.com/gorilla/websocket"
- log "github.com/lxc/lxd/shared/log15"
- "github.com/pborman/uuid"
"github.com/lxc/lxd/lxd/db"
+ "github.com/lxc/lxd/lxd/events"
"github.com/lxc/lxd/shared"
- "github.com/lxc/lxd/shared/api"
"github.com/lxc/lxd/shared/logger"
)
@@ -24,61 +16,6 @@ var eventsCmd = APIEndpoint{
Get: APIEndpointAction{Handler: eventsGet, AccessHandler: AllowAuthenticated},
}
-type eventsHandler struct {
-}
-
-func logContextMap(ctx []interface{}) map[string]string {
- var key string
- ctxMap := map[string]string{}
-
- for _, entry := range ctx {
- if key == "" {
- key = entry.(string)
- } else {
- ctxMap[key] = fmt.Sprintf("%v", entry)
- key = ""
- }
- }
-
- return ctxMap
-}
-
-func (h eventsHandler) Log(r *log.Record) error {
- eventSend("", "logging", api.EventLogging{
- Message: r.Msg,
- Level: r.Lvl.String(),
- Context: logContextMap(r.Ctx)})
- return nil
-}
-
-func eventSendLifecycle(project, action, source string,
- context map[string]interface{}) error {
- eventSend(project, "lifecycle", api.EventLifecycle{
- Action: action,
- Source: source,
- Context: context})
- return nil
-}
-
-var eventsLock sync.Mutex
-var eventListeners map[string]*eventListener = make(map[string]*eventListener)
-
-type eventListener struct {
- project string
- connection *websocket.Conn
- messageTypes []string
- active chan bool
- id string
- lock sync.Mutex
- done bool
- location string
-
- // If true, this listener won't get events forwarded from other
- // nodes. It only used by listeners created internally by LXD nodes
- // connecting to other LXD nodes to get their local events only.
- noForward bool
-}
-
type eventsServe struct {
req *http.Request
d *Daemon
@@ -117,27 +54,16 @@ func eventsSocket(d *Daemon, r *http.Request, w http.ResponseWriter) error {
return err
}
- listener := eventListener{
- project: project,
- active: make(chan bool, 1),
- connection: c,
- id: uuid.NewRandom().String(),
- messageTypes: strings.Split(typeStr, ","),
- location: serverName,
- }
-
// If this request is an internal one initiated by another node wanting
// to watch the events on this node, set the listener to broadcast only
// local events.
- listener.noForward = isClusterNotification(r)
+ listener := events.NewEventListener(project, c, strings.Split(typeStr, ","), serverName, isClusterNotification(r))
- eventsLock.Lock()
- eventListeners[listener.id] = &listener
- eventsLock.Unlock()
+ events.AddListener(listener)
- logger.Debugf("New event listener: %s", listener.id)
+ logger.Debugf("New event listener: %s", listener.ID())
- <-listener.active
+ listener.Wait()
return nil
}
@@ -145,110 +71,3 @@ func eventsSocket(d *Daemon, r *http.Request, w http.ResponseWriter) error {
func eventsGet(d *Daemon, r *http.Request) Response {
return &eventsServe{req: r, d: d}
}
-
-func eventSend(project, eventType string, eventMessage interface{}) error {
- encodedMessage, err := json.Marshal(eventMessage)
- if err != nil {
- return err
- }
- event := api.Event{
- Type: eventType,
- Timestamp: time.Now(),
- Metadata: encodedMessage,
- }
-
- return eventBroadcast(project, event, false)
-}
-
-func eventBroadcast(project string, event api.Event, isForward bool) error {
- eventsLock.Lock()
- listeners := eventListeners
- for _, listener := range listeners {
- if project != "" && listener.project != "*" && project != listener.project {
- continue
- }
-
- if isForward && listener.noForward {
- continue
- }
-
- if !shared.StringInSlice(event.Type, listener.messageTypes) {
- continue
- }
-
- go func(listener *eventListener, event api.Event) {
- // Check that the listener still exists
- if listener == nil {
- return
- }
-
- // Ensure there is only a single even going out at the time
- listener.lock.Lock()
- defer listener.lock.Unlock()
-
- // Make sure we're not done already
- if listener.done {
- return
- }
-
- // Set the Location to the expected serverName
- if event.Location == "" {
- eventCopy := api.Event{}
- err := shared.DeepCopy(&event, &eventCopy)
- if err != nil {
- return
- }
- eventCopy.Location = listener.location
-
- event = eventCopy
- }
-
- body, err := json.Marshal(event)
- if err != nil {
- return
- }
-
- err = listener.connection.WriteMessage(websocket.TextMessage, body)
- if err != nil {
- // Remove the listener from the list
- eventsLock.Lock()
- delete(eventListeners, listener.id)
- eventsLock.Unlock()
-
- // Disconnect the listener
- listener.connection.Close()
- listener.active <- false
- listener.done = true
- logger.Debugf("Disconnected event listener: %s", listener.id)
- }
- }(listener, event)
- }
- eventsLock.Unlock()
-
- return nil
-}
-
-// Forward to the local events dispatcher an event received from another node .
-func eventForward(id int64, event api.Event) {
- if event.Type == "logging" {
- // Parse the message
- logEntry := api.EventLogging{}
- err := json.Unmarshal(event.Metadata, &logEntry)
- if err != nil {
- return
- }
-
- if !debug && logEntry.Level == "dbug" {
- return
- }
-
- if !debug && !verbose && logEntry.Level == "info" {
- return
- }
- }
-
- err := eventBroadcast("", event, true)
- if err != nil {
- logger.Warnf("Failed to forward event from node %d: %v", id, err)
- }
-}
diff --git a/lxd/events/events.go b/lxd/events/events.go
new file mode 100644
index 0000000000..f2ae3fd5c8
--- /dev/null
+++ b/lxd/events/events.go
@@ -0,0 +1,259 @@
+package events
+
+import (
+ "encoding/json"
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/pborman/uuid"
+
+ "github.com/gorilla/websocket"
+ "github.com/lxc/lxd/shared"
+ "github.com/lxc/lxd/shared/api"
+ log "github.com/lxc/lxd/shared/log15"
+ "github.com/lxc/lxd/shared/logger"
+)
+
+var debug bool
+var verbose bool
+
+var eventsLock sync.Mutex
+var eventListeners map[string]*Listener = make(map[string]*Listener)
+
+// Listener describes an event listener.
+type Listener struct {
+ project string
+ connection *websocket.Conn
+ messageTypes []string
+ active chan bool
+ id string
+ lock sync.Mutex
+ done bool
+ location string
+
+ // If true, this listener won't get events forwarded from other
+ // nodes. It only used by listeners created internally by LXD nodes
+ // connecting to other LXD nodes to get their local events only.
+ noForward bool
+}
+
+// NewEventListener creates and returns a new event listener.
+func NewEventListener(project string, connection *websocket.Conn, messageTypes []string, location string, noForward bool) *Listener {
+ return &Listener{
+ project: project,
+ connection: connection,
+ messageTypes: messageTypes,
+ location: location,
+ noForward: noForward,
+ active: make(chan bool, 1),
+ id: uuid.NewRandom().String(),
+ }
+}
+
+// MessageTypes returns a list of message types the listener will be notified of.
+func (e *Listener) MessageTypes() []string {
+ return e.messageTypes
+}
+
+// IsDone returns true if the listener is done.
+func (e *Listener) IsDone() bool {
+ return e.done
+}
+
+// Connection returns the underlying websocket connection.
+func (e *Listener) Connection() *websocket.Conn {
+ return e.connection
+}
+
+// ID returns the listener ID.
+func (e *Listener) ID() string {
+ return e.id
+}
+
+// Wait waits for a message on its active channel, then returns.
+func (e *Listener) Wait() {
+ <-e.active
+}
+
+// Lock locks the internal mutex.
+func (e *Listener) Lock() {
+ e.lock.Lock()
+}
+
+// Unlock unlocks the internal mutex.
+func (e *Listener) Unlock() {
+ e.lock.Unlock()
+}
+
+// Deactivate deactivates the event listener.
+func (e *Listener) Deactivate() {
+ e.active <- false
+ e.done = true
+}
+
+// Handler describes an event handler.
+type Handler struct {
+}
+
+// NewEventHandler creates and returns a new event handler.
+func NewEventHandler() *Handler {
+ return &Handler{}
+}
+
+// Log sends a new logging event.
+func (h Handler) Log(r *log.Record) error {
+ Send("", "logging", api.EventLogging{
+ Message: r.Msg,
+ Level: r.Lvl.String(),
+ Context: logContextMap(r.Ctx)})
+ return nil
+}
+
+// Init sets the debug and verbose flags.
+func Init(d bool, v bool) {
+ debug = d
+ verbose = v
+}
+
+// AddListener adds the given listener to the internal list of listeners which
+// are notified when events are broadcasted.
+func AddListener(listener *Listener) {
+ eventsLock.Lock()
+ eventListeners[listener.id] = listener
+ eventsLock.Unlock()
+}
+
+// SendLifecycle broadcasts a lifecycle event.
+func SendLifecycle(project, action, source string,
+ context map[string]interface{}) error {
+ Send(project, "lifecycle", api.EventLifecycle{
+ Action: action,
+ Source: source,
+ Context: context})
+ return nil
+}
+
+// Send broadcasts a custom event.
+func Send(project, eventType string, eventMessage interface{}) error {
+ encodedMessage, err := json.Marshal(eventMessage)
+ if err != nil {
+ return err
+ }
+ event := api.Event{
+ Type: eventType,
+ Timestamp: time.Now(),
+ Metadata: encodedMessage,
+ }
+
+ return broadcast(project, event, false)
+}
+
+// Forward to the local events dispatcher an event received from another node.
+func Forward(id int64, event api.Event) {
+ if event.Type == "logging" {
+ // Parse the message
+ logEntry := api.EventLogging{}
+ err := json.Unmarshal(event.Metadata, &logEntry)
+ if err != nil {
+ return
+ }
+
+ if !debug && logEntry.Level == "dbug" {
+ return
+ }
+
+ if !debug && !verbose && logEntry.Level == "info" {
+ return
+ }
+ }
+
+ err := broadcast("", event, true)
+ if err != nil {
+ logger.Warnf("Failed to forward event from node %d: %v", id, err)
+ }
+}
+
+func logContextMap(ctx []interface{}) map[string]string {
+ var key string
+ ctxMap := map[string]string{}
+
+ for _, entry := range ctx {
+ if key == "" {
+ key = entry.(string)
+ } else {
+ ctxMap[key] = fmt.Sprintf("%v", entry)
+ key = ""
+ }
+ }
+
+ return ctxMap
+}
+
+func broadcast(project string, event api.Event, isForward bool) error {
+ eventsLock.Lock()
+ listeners := eventListeners
+ for _, listener := range listeners {
+ if project != "" && listener.project != "*" && project != listener.project {
+ continue
+ }
+
+ if isForward && listener.noForward {
+ continue
+ }
+
+ if !shared.StringInSlice(event.Type, listener.messageTypes) {
+ continue
+ }
+
+ go func(listener *Listener, event api.Event) {
+ // Check that the listener still exists
+ if listener == nil {
+ return
+ }
+
+ // Ensure there is only a single even going out at the time
+ listener.lock.Lock()
+ defer listener.lock.Unlock()
+
+ // Make sure we're not done already
+ if listener.done {
+ return
+ }
+
+ // Set the Location to the expected serverName
+ if event.Location == "" {
+ eventCopy := api.Event{}
+ err := shared.DeepCopy(&event, &eventCopy)
+ if err != nil {
+ return
+ }
+ eventCopy.Location = listener.location
+
+ event = eventCopy
+ }
+
+ body, err := json.Marshal(event)
+ if err != nil {
+ return
+ }
+
+ err = listener.connection.WriteMessage(websocket.TextMessage, body)
+ if err != nil {
+ // Remove the listener from the list
+ eventsLock.Lock()
+ delete(eventListeners, listener.id)
+ eventsLock.Unlock()
+
+ // Disconnect the listener
+ listener.connection.Close()
+ listener.active <- false
+ listener.done = true
+ logger.Debugf("Disconnected event listener: %s", listener.id)
+ }
+ }(listener, event)
+ }
+ eventsLock.Unlock()
+
+ return nil
+}
diff --git a/lxd/main.go b/lxd/main.go
index af52a93423..476ad9ecb4 100644
--- a/lxd/main.go
+++ b/lxd/main.go
@@ -7,6 +7,7 @@ import (
"github.com/spf13/cobra"
+ "github.com/lxc/lxd/lxd/events"
"github.com/lxc/lxd/shared/logger"
"github.com/lxc/lxd/shared/logging"
"github.com/lxc/lxd/shared/version"
@@ -39,14 +40,16 @@ func (c *cmdGlobal) Run(cmd *cobra.Command, args []string) error {
debug = c.flagLogDebug
verbose = c.flagLogVerbose
+ // Set debug and verbose for the events package
+ events.Init(debug, verbose)
+
// Setup logger
syslog := ""
if c.flagLogSyslog {
syslog = "lxd"
}
- handler := eventsHandler{}
- log, err := logging.GetLogger(syslog, c.flagLogFile, c.flagLogVerbose, c.flagLogDebug, handler)
+ log, err := logging.GetLogger(syslog, c.flagLogFile, c.flagLogVerbose, c.flagLogDebug, events.NewEventHandler())
if err != nil {
return err
}
diff --git a/lxd/main_forkdns.go b/lxd/main_forkdns.go
index e339593c49..0ba1d322b9 100644
--- a/lxd/main_forkdns.go
+++ b/lxd/main_forkdns.go
@@ -14,6 +14,7 @@ import (
"github.com/spf13/cobra"
"gopkg.in/fsnotify.v0"
+ "github.com/lxc/lxd/lxd/events"
"github.com/lxc/lxd/shared"
"github.com/lxc/lxd/shared/dnsutil"
"github.com/lxc/lxd/shared/logger"
@@ -442,7 +443,7 @@ func (c *cmdForkDNS) Run(cmd *cobra.Command, args []string) error {
return fmt.Errorf("Missing required arguments")
}
- log, err := logging.GetLogger("lxd-forkdns", "", false, false, eventsHandler{})
+ log, err := logging.GetLogger("lxd-forkdns", "", false, false, events.NewEventHandler())
if err != nil {
return err
}
diff --git a/lxd/operations.go b/lxd/operations.go
index d0908e31e3..c96409b58e 100644
--- a/lxd/operations.go
+++ b/lxd/operations.go
@@ -14,6 +14,7 @@ import (
"github.com/lxc/lxd/lxd/cluster"
"github.com/lxc/lxd/lxd/db"
+ "github.com/lxc/lxd/lxd/events"
"github.com/lxc/lxd/lxd/node"
"github.com/lxc/lxd/lxd/util"
"github.com/lxc/lxd/shared"
@@ -154,7 +155,7 @@ func (op *operation) Run() (chan error, error) {
logger.Debugf("Failure for %s operation: %s: %s", op.class.String(), op.id, err)
_, md, _ := op.Render()
- eventSend(op.project, "operation", md)
+ events.Send(op.project, "operation", md)
return
}
@@ -167,7 +168,7 @@ func (op *operation) Run() (chan error, error) {
op.lock.Lock()
logger.Debugf("Success for %s operation: %s", op.class.String(), op.id)
_, md, _ := op.Render()
- eventSend(op.project, "operation", md)
+ events.Send(op.project, "operation", md)
op.lock.Unlock()
}(op, chanRun)
}
@@ -175,7 +176,7 @@ func (op *operation) Run() (chan error, error) {
logger.Debugf("Started %s operation: %s", op.class.String(), op.id)
_, md, _ := op.Render()
- eventSend(op.project, "operation", md)
+ events.Send(op.project, "operation", md)
return chanRun, nil
}
@@ -207,7 +208,7 @@ func (op *operation) Cancel() (chan error, error) {
logger.Debugf("Failed to cancel %s operation: %s: %s", op.class.String(), op.id, err)
_, md, _ := op.Render()
- eventSend(op.project, "operation", md)
+ events.Send(op.project, "operation", md)
return
}
@@ -219,13 +220,13 @@ func (op *operation) Cancel() (chan error, error) {
logger.Debugf("Cancelled %s operation: %s", op.class.String(), op.id)
_, md, _ := op.Render()
- eventSend(op.project, "operation", md)
+ events.Send(op.project, "operation", md)
}(op, oldStatus, chanCancel)
}
logger.Debugf("Cancelling %s operation: %s", op.class.String(), op.id)
_, md, _ := op.Render()
- eventSend(op.project, "operation", md)
+ events.Send(op.project, "operation", md)
if op.canceler != nil {
err := op.canceler.Cancel()
@@ -244,7 +245,7 @@ func (op *operation) Cancel() (chan error, error) {
logger.Debugf("Cancelled %s operation: %s", op.class.String(), op.id)
_, md, _ = op.Render()
- eventSend(op.project, "operation", md)
+ events.Send(op.project, "operation", md)
return chanCancel, nil
}
@@ -383,7 +384,7 @@ func (op *operation) UpdateResources(opResources map[string][]string) error {
logger.Debugf("Updated resources for %s operation: %s", op.class.String(), op.id)
_, md, _ := op.Render()
- eventSend(op.project, "operation", md)
+ events.Send(op.project, "operation", md)
return nil
}
@@ -409,7 +410,7 @@ func (op *operation) UpdateMetadata(opMetadata interface{}) error {
logger.Debugf("Updated metadata for %s operation: %s", op.class.String(), op.id)
_, md, _ := op.Render()
- eventSend(op.project, "operation", md)
+ events.Send(op.project, "operation", md)
return nil
}
@@ -472,7 +473,7 @@ func operationCreate(cluster *db.Cluster, project string, opClass operationClass
logger.Debugf("New %s operation: %s", op.class.String(), op.id)
_, md, _ := op.Render()
- eventSend(op.project, "operation", md)
+ events.Send(op.project, "operation", md)
return &op, nil
}
From 841007fabda56bb6b4755a0606a137bddadd711b Mon Sep 17 00:00:00 2001
From: Thomas Hipp <thomas.hipp at canonical.com>
Date: Wed, 25 Sep 2019 09:39:50 +0200
Subject: [PATCH 2/2] test: Add events package to static analysis test
Signed-off-by: Thomas Hipp <thomas.hipp at canonical.com>
---
test/suites/static_analysis.sh | 1 +
1 file changed, 1 insertion(+)
diff --git a/test/suites/static_analysis.sh b/test/suites/static_analysis.sh
index 8f40270444..21291beeb6 100644
--- a/test/suites/static_analysis.sh
+++ b/test/suites/static_analysis.sh
@@ -78,6 +78,7 @@ test_static_analysis() {
golint -set_exit_status lxd/db/query
golint -set_exit_status lxd/db/schema
golint -set_exit_status lxd/endpoints
+ golint -set_exit_status lxd/events
golint -set_exit_status lxd/maas
#golint -set_exit_status lxd/migration
golint -set_exit_status lxd/node
More information about the lxc-devel
mailing list