[lxc-devel] [lxd/master] Add /1.0/events to the /dev/lxd API (and some cleanup and fixes)

stgraber on Github lxc-bot at linuxcontainers.org
Thu Dec 21 22:52:03 UTC 2017


A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 301 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20171221/a67c05af/attachment.bin>
-------------- next part --------------
From a6eaf97fde1c8392c7cd6f8ecbafa11a9db00395 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Thu, 21 Dec 2017 17:49:44 -0500
Subject: [PATCH 1/3] lxd/containers: Fix tc egress rules
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
 lxd/container_lxc.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/lxd/container_lxc.go b/lxd/container_lxc.go
index bed1fb40f..b31807aa9 100644
--- a/lxd/container_lxc.go
+++ b/lxd/container_lxc.go
@@ -7564,7 +7564,7 @@ func (c *containerLXC) setNetworkLimits(name string, m types.Device) error {
 			return fmt.Errorf("Failed to create ingress tc qdisc: %s", out)
 		}
 
-		out, err = shared.RunCommand("tc", "filter", "add", "dev", veth, "parent", "ffff:0", "protocol", "all", "u32", "match", "u32", "0", "0", "police", "rate", fmt.Sprintf("%dbit", egressInt), "burst", "1024k", "mtu", "64kb", "drop", "flowid", ":1")
+		out, err = shared.RunCommand("tc", "filter", "add", "dev", veth, "parent", "ffff:0", "protocol", "all", "u32", "match", "u32", "0", "0", "police", "rate", fmt.Sprintf("%dbit", egressInt), "burst", "1024k", "mtu", "64kb", "drop")
 		if err != nil {
 			return fmt.Errorf("Failed to create ingress tc qdisc: %s", out)
 		}

From e975a1f82df132bf149fe67dce8fc8dbb59e6bf8 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Thu, 21 Dec 2017 17:50:14 -0500
Subject: [PATCH 2/3] lxd/events: Cleanup event listener setup
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
 lxd/events.go | 16 ++++++++--------
 1 file changed, 8 insertions(+), 8 deletions(-)

diff --git a/lxd/events.go b/lxd/events.go
index 4d0c1e9b5..f79b03fcc 100644
--- a/lxd/events.go
+++ b/lxd/events.go
@@ -68,8 +68,6 @@ func (r *eventsServe) String() string {
 }
 
 func eventsSocket(r *http.Request, w http.ResponseWriter) error {
-	listener := eventListener{}
-
 	typeStr := r.FormValue("type")
 	if typeStr == "" {
 		typeStr = "logging,operation"
@@ -80,16 +78,18 @@ func eventsSocket(r *http.Request, w http.ResponseWriter) error {
 		return err
 	}
 
-	listener.active = make(chan bool, 1)
-	listener.connection = c
-	listener.id = uuid.NewRandom().String()
-	listener.messageTypes = strings.Split(typeStr, ",")
+	listener := eventListener{
+		active:       make(chan bool, 1),
+		connection:   c,
+		id:           uuid.NewRandom().String(),
+		messageTypes: strings.Split(typeStr, ","),
+	}
 
 	eventsLock.Lock()
 	eventListeners[listener.id] = &listener
 	eventsLock.Unlock()
 
-	logger.Debugf("New events listener: %s", listener.id)
+	logger.Debugf("New event listener: %s", listener.id)
 
 	<-listener.active
 
@@ -146,7 +146,7 @@ func eventSend(eventType string, eventMessage interface{}) error {
 				listener.connection.Close()
 				listener.active <- false
 				listener.done = true
-				logger.Debugf("Disconnected events listener: %s", listener.id)
+				logger.Debugf("Disconnected event listener: %s", listener.id)
 			}
 		}(listener, body)
 	}

From 5fb338e214179b7f678291287cf4bd5007a8d69b Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Thu, 21 Dec 2017 17:50:53 -0500
Subject: [PATCH 3/3] Add /1.0/events to the /dev/lxd/sock API
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Closes #3996

Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
 lxd/container_lxc.go |  61 +++++++++++++++++++++++++++++++
 lxd/devlxd.go        | 100 +++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 161 insertions(+)

diff --git a/lxd/container_lxc.go b/lxd/container_lxc.go
index b31807aa9..05d10efe1 100644
--- a/lxd/container_lxc.go
+++ b/lxd/container_lxc.go
@@ -4415,6 +4415,67 @@ func (c *containerLXC) Update(args db.ContainerArgs, userRequested bool) error {
 		networkUpdateStatic(c.state, "")
 	}
 
+	// Send devlxd notifications
+	if isRunning {
+		// Config changes (only for user.* keys
+		for _, key := range changedConfig {
+			if !strings.HasPrefix(key, "user.") {
+				continue
+			}
+
+			msg := map[string]string{
+				"key":       key,
+				"old_value": oldExpandedConfig[key],
+				"value":     c.expandedConfig[key],
+			}
+
+			err = devlxdEventSend(c, "config", msg)
+			if err != nil {
+				return err
+			}
+		}
+
+		// Device changes
+		for k, m := range removeDevices {
+			msg := map[string]interface{}{
+				"action": "removed",
+				"name":   k,
+				"config": m,
+			}
+
+			err = devlxdEventSend(c, "device", msg)
+			if err != nil {
+				return err
+			}
+		}
+
+		for k, m := range updateDevices {
+			msg := map[string]interface{}{
+				"action": "updated",
+				"name":   k,
+				"config": m,
+			}
+
+			err = devlxdEventSend(c, "device", msg)
+			if err != nil {
+				return err
+			}
+		}
+
+		for k, m := range addDevices {
+			msg := map[string]interface{}{
+				"action": "added",
+				"name":   k,
+				"config": m,
+			}
+
+			err = devlxdEventSend(c, "device", msg)
+			if err != nil {
+				return err
+			}
+		}
+	}
+
 	// Success, update the closure to mark that the changes should be kept.
 	undoChanges = false
 
diff --git a/lxd/devlxd.go b/lxd/devlxd.go
index b1cb16ea9..b38138c73 100644
--- a/lxd/devlxd.go
+++ b/lxd/devlxd.go
@@ -1,6 +1,7 @@
 package main
 
 import (
+	"encoding/json"
 	"fmt"
 	"io/ioutil"
 	"net"
@@ -11,9 +12,12 @@ import (
 	"strconv"
 	"strings"
 	"sync"
+	"time"
 	"unsafe"
 
 	"github.com/gorilla/mux"
+	"github.com/gorilla/websocket"
+	"github.com/pborman/uuid"
 
 	"github.com/lxc/lxd/lxd/db"
 	"github.com/lxc/lxd/lxd/util"
@@ -82,6 +86,101 @@ var devlxdMetadataGet = devLxdHandler{"/1.0/meta-data", func(c container, w http
 	return okResponse(fmt.Sprintf("#cloud-config\ninstance-id: %s\nlocal-hostname: %s\n%s", c.Name(), c.Name(), value), "raw")
 }}
 
+var devlxdEventsLock sync.Mutex
+var devlxdEventListeners map[int]map[string]*eventListener = make(map[int]map[string]*eventListener)
+
+var devlxdEventsGet = devLxdHandler{"/1.0/events", func(c container, w http.ResponseWriter, r *http.Request) *devLxdResponse {
+	typeStr := r.FormValue("type")
+	if typeStr == "" {
+		typeStr = "config,device"
+	}
+
+	conn, err := shared.WebsocketUpgrader.Upgrade(w, r, nil)
+	if err != nil {
+		return &devLxdResponse{"internal server error", http.StatusInternalServerError, "raw"}
+	}
+
+	listener := eventListener{
+		active:       make(chan bool, 1),
+		connection:   conn,
+		id:           uuid.NewRandom().String(),
+		messageTypes: strings.Split(typeStr, ","),
+	}
+
+	devlxdEventsLock.Lock()
+	cid := c.Id()
+	_, ok := devlxdEventListeners[cid]
+	if !ok {
+		devlxdEventListeners[cid] = map[string]*eventListener{}
+	}
+	devlxdEventListeners[cid][listener.id] = &listener
+	devlxdEventsLock.Unlock()
+
+	logger.Debugf("New container event listener for '%s': %s", c.Name(), listener.id)
+
+	<-listener.active
+
+	return &devLxdResponse{"websocket", http.StatusOK, "websocket"}
+}}
+
+func devlxdEventSend(c container, eventType string, eventMessage interface{}) error {
+	event := shared.Jmap{}
+	event["type"] = eventType
+	event["timestamp"] = time.Now()
+	event["metadata"] = eventMessage
+
+	body, err := json.Marshal(event)
+	if err != nil {
+		return err
+	}
+
+	devlxdEventsLock.Lock()
+	cid := c.Id()
+	listeners, ok := devlxdEventListeners[cid]
+	if !ok {
+		return nil
+	}
+
+	for _, listener := range listeners {
+		if !shared.StringInSlice(eventType, listener.messageTypes) {
+			continue
+		}
+
+		go func(listener *eventListener, body []byte) {
+			// Check that the listener still exists
+			if listener == nil {
+				return
+			}
+
+			// Ensure there is only a single even going out at the time
+			listener.lock.Lock()
+			defer listener.lock.Unlock()
+
+			// Make sure we're not done already
+			if listener.done {
+				return
+			}
+
+			err = listener.connection.WriteMessage(websocket.TextMessage, body)
+			if err != nil {
+				// Remove the listener from the list
+				devlxdEventsLock.Lock()
+				delete(devlxdEventListeners[cid], listener.id)
+				devlxdEventsLock.Unlock()
+
+				// Disconnect the listener
+				listener.connection.Close()
+				listener.active <- false
+				listener.done = true
+				logger.Debugf("Disconnected container event listener for '%s': %s", c.Name(), listener.id)
+			}
+		}(listener, body)
+	}
+	devlxdEventsLock.Unlock()
+
+	return nil
+}
+
 var handlers = []devLxdHandler{
 	{"/", func(c container, w http.ResponseWriter, r *http.Request) *devLxdResponse {
 		return okResponse([]string{"/1.0"}, "json")
@@ -92,6 +191,7 @@ var handlers = []devLxdHandler{
 	devlxdConfigGet,
 	devlxdConfigKeyGet,
 	devlxdMetadataGet,
+	devlxdEventsGet,
 }
 
 func hoistReq(f func(container, http.ResponseWriter, *http.Request) *devLxdResponse, d *Daemon) func(http.ResponseWriter, *http.Request) {


More information about the lxc-devel mailing list