[lxc-devel] [lxd/master] Add /1.0/events to the /dev/lxd API (and some cleanup and fixes)
stgraber on Github
lxc-bot at linuxcontainers.org
Thu Dec 21 22:52:03 UTC 2017
A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 301 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20171221/a67c05af/attachment.bin>
-------------- next part --------------
From a6eaf97fde1c8392c7cd6f8ecbafa11a9db00395 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Thu, 21 Dec 2017 17:49:44 -0500
Subject: [PATCH 1/3] lxd/containers: Fix tc egress rules
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
lxd/container_lxc.go | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/lxd/container_lxc.go b/lxd/container_lxc.go
index bed1fb40f..b31807aa9 100644
--- a/lxd/container_lxc.go
+++ b/lxd/container_lxc.go
@@ -7564,7 +7564,7 @@ func (c *containerLXC) setNetworkLimits(name string, m types.Device) error {
return fmt.Errorf("Failed to create ingress tc qdisc: %s", out)
}
- out, err = shared.RunCommand("tc", "filter", "add", "dev", veth, "parent", "ffff:0", "protocol", "all", "u32", "match", "u32", "0", "0", "police", "rate", fmt.Sprintf("%dbit", egressInt), "burst", "1024k", "mtu", "64kb", "drop", "flowid", ":1")
+ out, err = shared.RunCommand("tc", "filter", "add", "dev", veth, "parent", "ffff:0", "protocol", "all", "u32", "match", "u32", "0", "0", "police", "rate", fmt.Sprintf("%dbit", egressInt), "burst", "1024k", "mtu", "64kb", "drop")
if err != nil {
return fmt.Errorf("Failed to create ingress tc qdisc: %s", out)
}
From e975a1f82df132bf149fe67dce8fc8dbb59e6bf8 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Thu, 21 Dec 2017 17:50:14 -0500
Subject: [PATCH 2/3] lxd/events: Cleanup event listener setup
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
lxd/events.go | 16 ++++++++--------
1 file changed, 8 insertions(+), 8 deletions(-)
diff --git a/lxd/events.go b/lxd/events.go
index 4d0c1e9b5..f79b03fcc 100644
--- a/lxd/events.go
+++ b/lxd/events.go
@@ -68,8 +68,6 @@ func (r *eventsServe) String() string {
}
func eventsSocket(r *http.Request, w http.ResponseWriter) error {
- listener := eventListener{}
-
typeStr := r.FormValue("type")
if typeStr == "" {
typeStr = "logging,operation"
@@ -80,16 +78,18 @@ func eventsSocket(r *http.Request, w http.ResponseWriter) error {
return err
}
- listener.active = make(chan bool, 1)
- listener.connection = c
- listener.id = uuid.NewRandom().String()
- listener.messageTypes = strings.Split(typeStr, ",")
+ listener := eventListener{
+ active: make(chan bool, 1),
+ connection: c,
+ id: uuid.NewRandom().String(),
+ messageTypes: strings.Split(typeStr, ","),
+ }
eventsLock.Lock()
eventListeners[listener.id] = &listener
eventsLock.Unlock()
- logger.Debugf("New events listener: %s", listener.id)
+ logger.Debugf("New event listener: %s", listener.id)
<-listener.active
@@ -146,7 +146,7 @@ func eventSend(eventType string, eventMessage interface{}) error {
listener.connection.Close()
listener.active <- false
listener.done = true
- logger.Debugf("Disconnected events listener: %s", listener.id)
+ logger.Debugf("Disconnected event listener: %s", listener.id)
}
}(listener, body)
}
From 5fb338e214179b7f678291287cf4bd5007a8d69b Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Thu, 21 Dec 2017 17:50:53 -0500
Subject: [PATCH 3/3] Add /1.0/events to the /dev/lxd/sock API
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Closes #3996
Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
lxd/container_lxc.go | 61 +++++++++++++++++++++++++++++++
lxd/devlxd.go | 100 +++++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 161 insertions(+)
diff --git a/lxd/container_lxc.go b/lxd/container_lxc.go
index b31807aa9..05d10efe1 100644
--- a/lxd/container_lxc.go
+++ b/lxd/container_lxc.go
@@ -4415,6 +4415,67 @@ func (c *containerLXC) Update(args db.ContainerArgs, userRequested bool) error {
networkUpdateStatic(c.state, "")
}
+ // Send devlxd notifications
+ if isRunning {
+ // Config changes (only for user.* keys
+ for _, key := range changedConfig {
+ if !strings.HasPrefix(key, "user.") {
+ continue
+ }
+
+ msg := map[string]string{
+ "key": key,
+ "old_value": oldExpandedConfig[key],
+ "value": c.expandedConfig[key],
+ }
+
+ err = devlxdEventSend(c, "config", msg)
+ if err != nil {
+ return err
+ }
+ }
+
+ // Device changes
+ for k, m := range removeDevices {
+ msg := map[string]interface{}{
+ "action": "removed",
+ "name": k,
+ "config": m,
+ }
+
+ err = devlxdEventSend(c, "device", msg)
+ if err != nil {
+ return err
+ }
+ }
+
+ for k, m := range updateDevices {
+ msg := map[string]interface{}{
+ "action": "updated",
+ "name": k,
+ "config": m,
+ }
+
+ err = devlxdEventSend(c, "device", msg)
+ if err != nil {
+ return err
+ }
+ }
+
+ for k, m := range addDevices {
+ msg := map[string]interface{}{
+ "action": "added",
+ "name": k,
+ "config": m,
+ }
+
+ err = devlxdEventSend(c, "device", msg)
+ if err != nil {
+ return err
+ }
+ }
+ }
+
// Success, update the closure to mark that the changes should be kept.
undoChanges = false
diff --git a/lxd/devlxd.go b/lxd/devlxd.go
index b1cb16ea9..b38138c73 100644
--- a/lxd/devlxd.go
+++ b/lxd/devlxd.go
@@ -1,6 +1,7 @@
package main
import (
+ "encoding/json"
"fmt"
"io/ioutil"
"net"
@@ -11,9 +12,12 @@ import (
"strconv"
"strings"
"sync"
+ "time"
"unsafe"
"github.com/gorilla/mux"
+ "github.com/gorilla/websocket"
+ "github.com/pborman/uuid"
"github.com/lxc/lxd/lxd/db"
"github.com/lxc/lxd/lxd/util"
@@ -82,6 +86,101 @@ var devlxdMetadataGet = devLxdHandler{"/1.0/meta-data", func(c container, w http
return okResponse(fmt.Sprintf("#cloud-config\ninstance-id: %s\nlocal-hostname: %s\n%s", c.Name(), c.Name(), value), "raw")
}}
+var devlxdEventsLock sync.Mutex
+var devlxdEventListeners map[int]map[string]*eventListener = make(map[int]map[string]*eventListener)
+
+var devlxdEventsGet = devLxdHandler{"/1.0/events", func(c container, w http.ResponseWriter, r *http.Request) *devLxdResponse {
+ typeStr := r.FormValue("type")
+ if typeStr == "" {
+ typeStr = "config,device"
+ }
+
+ conn, err := shared.WebsocketUpgrader.Upgrade(w, r, nil)
+ if err != nil {
+ return &devLxdResponse{"internal server error", http.StatusInternalServerError, "raw"}
+ }
+
+ listener := eventListener{
+ active: make(chan bool, 1),
+ connection: conn,
+ id: uuid.NewRandom().String(),
+ messageTypes: strings.Split(typeStr, ","),
+ }
+
+ devlxdEventsLock.Lock()
+ cid := c.Id()
+ _, ok := devlxdEventListeners[cid]
+ if !ok {
+ devlxdEventListeners[cid] = map[string]*eventListener{}
+ }
+ devlxdEventListeners[cid][listener.id] = &listener
+ devlxdEventsLock.Unlock()
+
+ logger.Debugf("New container event listener for '%s': %s", c.Name(), listener.id)
+
+ <-listener.active
+
+ return &devLxdResponse{"websocket", http.StatusOK, "websocket"}
+}}
+
+func devlxdEventSend(c container, eventType string, eventMessage interface{}) error {
+ event := shared.Jmap{}
+ event["type"] = eventType
+ event["timestamp"] = time.Now()
+ event["metadata"] = eventMessage
+
+ body, err := json.Marshal(event)
+ if err != nil {
+ return err
+ }
+
+ devlxdEventsLock.Lock()
+ cid := c.Id()
+ listeners, ok := devlxdEventListeners[cid]
+ if !ok {
+ return nil
+ }
+
+ for _, listener := range listeners {
+ if !shared.StringInSlice(eventType, listener.messageTypes) {
+ continue
+ }
+
+ go func(listener *eventListener, body []byte) {
+ // Check that the listener still exists
+ if listener == nil {
+ return
+ }
+
+ // Ensure there is only a single even going out at the time
+ listener.lock.Lock()
+ defer listener.lock.Unlock()
+
+ // Make sure we're not done already
+ if listener.done {
+ return
+ }
+
+ err = listener.connection.WriteMessage(websocket.TextMessage, body)
+ if err != nil {
+ // Remove the listener from the list
+ devlxdEventsLock.Lock()
+ delete(devlxdEventListeners[cid], listener.id)
+ devlxdEventsLock.Unlock()
+
+ // Disconnect the listener
+ listener.connection.Close()
+ listener.active <- false
+ listener.done = true
+ logger.Debugf("Disconnected container event listener for '%s': %s", c.Name(), listener.id)
+ }
+ }(listener, body)
+ }
+ devlxdEventsLock.Unlock()
+
+ return nil
+}
+
var handlers = []devLxdHandler{
{"/", func(c container, w http.ResponseWriter, r *http.Request) *devLxdResponse {
return okResponse([]string{"/1.0"}, "json")
@@ -92,6 +191,7 @@ var handlers = []devLxdHandler{
devlxdConfigGet,
devlxdConfigKeyGet,
devlxdMetadataGet,
+ devlxdEventsGet,
}
func hoistReq(f func(container, http.ResponseWriter, *http.Request) *devLxdResponse, d *Daemon) func(http.ResponseWriter, *http.Request) {
More information about the lxc-devel
mailing list