Commit b7827962 by Torkel Ödegaard

feat(live): just wanted to checkout how far I got on the websocket data source

parent e81e4ad0
...@@ -48,6 +48,7 @@ func (c *connection) readPump() { ...@@ -48,6 +48,7 @@ func (c *connection) readPump() {
h.unregister <- c h.unregister <- c
c.ws.Close() c.ws.Close()
}() }()
c.ws.SetReadLimit(maxMessageSize) c.ws.SetReadLimit(maxMessageSize)
c.ws.SetReadDeadline(time.Now().Add(pongWait)) c.ws.SetReadDeadline(time.Now().Add(pongWait))
c.ws.SetPongHandler(func(string) error { c.ws.SetReadDeadline(time.Now().Add(pongWait)); return nil }) c.ws.SetPongHandler(func(string) error { c.ws.SetReadDeadline(time.Now().Add(pongWait)); return nil })
......
...@@ -7,6 +7,7 @@ import ( ...@@ -7,6 +7,7 @@ import (
) )
type hub struct { type hub struct {
log log.Logger
connections map[*connection]bool connections map[*connection]bool
streams map[string]map[*connection]bool streams map[string]map[*connection]bool
...@@ -29,10 +30,10 @@ var h = hub{ ...@@ -29,10 +30,10 @@ var h = hub{
unregister: make(chan *connection), unregister: make(chan *connection),
streamChannel: make(chan *dtos.StreamMessage), streamChannel: make(chan *dtos.StreamMessage),
subChannel: make(chan *streamSubscription), subChannel: make(chan *streamSubscription),
log: log.New("live.hub"),
} }
func (h *hub) removeConnection() { func (h *hub) removeConnection() {
} }
func (h *hub) run() { func (h *hub) run() {
...@@ -40,17 +41,17 @@ func (h *hub) run() { ...@@ -40,17 +41,17 @@ func (h *hub) run() {
select { select {
case c := <-h.register: case c := <-h.register:
h.connections[c] = true h.connections[c] = true
log.Info("Live: New connection (Total count: %v)", len(h.connections)) h.log.Info("New connection", "total", len(h.connections))
case c := <-h.unregister: case c := <-h.unregister:
if _, ok := h.connections[c]; ok { if _, ok := h.connections[c]; ok {
log.Info("Live: Closing Connection (Total count: %v)", len(h.connections)) h.log.Info("Closing connection", "total", len(h.connections))
delete(h.connections, c) delete(h.connections, c)
close(c.send) close(c.send)
} }
// hand stream subscriptions // hand stream subscriptions
case sub := <-h.subChannel: case sub := <-h.subChannel:
log.Info("Live: Subscribing to: %v, remove: %v", sub.name, sub.remove) h.log.Info("Subscribing", "channel", sub.name, "remove", sub.remove)
subscribers, exists := h.streams[sub.name] subscribers, exists := h.streams[sub.name]
// handle unsubscribe // handle unsubscribe
...@@ -63,13 +64,14 @@ func (h *hub) run() { ...@@ -63,13 +64,14 @@ func (h *hub) run() {
subscribers = make(map[*connection]bool) subscribers = make(map[*connection]bool)
h.streams[sub.name] = subscribers h.streams[sub.name] = subscribers
} }
subscribers[sub.conn] = true subscribers[sub.conn] = true
// handle stream messages // handle stream messages
case message := <-h.streamChannel: case message := <-h.streamChannel:
subscribers, exists := h.streams[message.Stream] subscribers, exists := h.streams[message.Stream]
if !exists || len(subscribers) == 0 { if !exists || len(subscribers) == 0 {
log.Info("Live: Message to stream without subscribers: %v", message.Stream) h.log.Info("Message to stream without subscribers", "stream", message.Stream)
continue continue
} }
......
...@@ -9,23 +9,27 @@ import ( ...@@ -9,23 +9,27 @@ import (
) )
type LiveConn struct { type LiveConn struct {
log log.Logger
} }
func New() *LiveConn { func New() *LiveConn {
go h.run() go h.run()
return &LiveConn{}
return &LiveConn{log: log.New("live.server")}
} }
func (lc *LiveConn) Serve(w http.ResponseWriter, r *http.Request) { func (lc *LiveConn) Serve(w http.ResponseWriter, r *http.Request) {
log.Info("Live: Upgrading to WebSocket") lc.log.Info("Upgrading to WebSocket")
ws, err := upgrader.Upgrade(w, r, nil) ws, err := upgrader.Upgrade(w, r, nil)
if err != nil { if err != nil {
log.Error(3, "Live: Failed to upgrade connection to WebSocket", err) log.Error(3, "Live: Failed to upgrade connection to WebSocket", err)
return return
} }
c := newConnection(ws) c := newConnection(ws)
h.register <- c h.register <- c
go c.writePump() go c.writePump()
c.readPump() c.readPump()
} }
......
...@@ -47,6 +47,14 @@ class MetricsPanelCtrl extends PanelCtrl { ...@@ -47,6 +47,14 @@ class MetricsPanelCtrl extends PanelCtrl {
this.events.on('refresh', this.onMetricsPanelRefresh.bind(this)); this.events.on('refresh', this.onMetricsPanelRefresh.bind(this));
this.events.on('init-edit-mode', this.onInitMetricsPanelEditMode.bind(this)); this.events.on('init-edit-mode', this.onInitMetricsPanelEditMode.bind(this));
this.events.on('panel-teardown', this.onPanelTearDown.bind(this));
}
private onPanelTearDown() {
if (this.dataSubscription) {
this.dataSubscription.unsubscribe();
this.dataSubscription = null;
}
} }
private onInitMetricsPanelEditMode() { private onInitMetricsPanelEditMode() {
......
{
"type": "datasource",
"name": "Grafana Live",
"id": "grafana-live",
"metrics": true
}
<query-editor-row ctrl="ctrl"> <query-editor-row query-ctrl="ctrl" can-collapse="false">
<li class="tight-form-item"> <div class="gf-form-inline">
Stream <div class="gf-form gf-form--grow">
</li> <label class="gf-form-label width-8">Stream</label>
<li> <input type="text" class="gf-form-input" ng-model="ctrl.target.stream" spellcheck='false' placeholder="metric">
<input type="text" class="tight-form-input input-large" ng-model="ctrl.target.stream"> </div>
</li> </div>
</query-editor-row> </query-editor-row>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment