Commit 2e1f2609 by Torkel Ödegaard

feat(grafana_live): more work pushing data to websocket, #4355

parent 92f20b9b
...@@ -4,6 +4,10 @@ import "encoding/json" ...@@ -4,6 +4,10 @@ import "encoding/json"
type StreamMessage struct { type StreamMessage struct {
Stream string `json:"stream"` Stream string `json:"stream"`
Metric string `json:"metric"` Series []StreamMessageSeries `json:"series"`
Datapoints [][]json.Number `json:"Datapoints"` }
type StreamMessageSeries struct {
Name string `json:"name"`
Datapoints [][]json.Number `json:"datapoints"`
} }
...@@ -81,7 +81,10 @@ func (c *connection) handleMessage(message []byte) { ...@@ -81,7 +81,10 @@ func (c *connection) handleMessage(message []byte) {
switch msgType { switch msgType {
case "subscribe": case "subscribe":
h.subChannel <- &streamSubscription{name: streamName, conn: c} h.subChannel <- &streamSubscription{name: streamName, conn: c}
case "unsubscribe":
h.subChannel <- &streamSubscription{name: streamName, conn: c, remove: true}
} }
} }
func (c *connection) write(mt int, payload []byte) error { func (c *connection) write(mt int, payload []byte) error {
......
...@@ -19,6 +19,7 @@ type hub struct { ...@@ -19,6 +19,7 @@ type hub struct {
type streamSubscription struct { type streamSubscription struct {
conn *connection conn *connection
name string name string
remove bool
} }
var h = hub{ var h = hub{
...@@ -40,15 +41,24 @@ func (h *hub) run() { ...@@ -40,15 +41,24 @@ func (h *hub) run() {
case c := <-h.register: case c := <-h.register:
h.connections[c] = true h.connections[c] = true
log.Info("Live: New connection (Total count: %v)", len(h.connections)) log.Info("Live: New connection (Total count: %v)", len(h.connections))
case c := <-h.unregister: case c := <-h.unregister:
if _, ok := h.connections[c]; ok { if _, ok := h.connections[c]; ok {
log.Info("Live: Closing Connection (Total count: %v)", len(h.connections))
delete(h.connections, c) delete(h.connections, c)
close(c.send) close(c.send)
} }
// hand stream subscriptions // hand stream subscriptions
case sub := <-h.subChannel: case sub := <-h.subChannel:
log.Info("Live: Connection subscribing to: %v", sub.name) log.Info("Live: Subscribing to: %v, remove: %v", sub.name, sub.remove)
subscribers, exists := h.streams[sub.name] subscribers, exists := h.streams[sub.name]
// handle unsubscribe
if exists && sub.remove {
delete(subscribers, sub.conn)
continue
}
if !exists { if !exists {
subscribers = make(map[*connection]bool) subscribers = make(map[*connection]bool)
h.streams[sub.name] = subscribers h.streams[sub.name] = subscribers
...@@ -58,13 +68,19 @@ func (h *hub) run() { ...@@ -58,13 +68,19 @@ func (h *hub) run() {
// handle stream messages // handle stream messages
case message := <-h.streamChannel: case message := <-h.streamChannel:
subscribers, exists := h.streams[message.Stream] subscribers, exists := h.streams[message.Stream]
if !exists { if !exists || len(subscribers) == 0 {
log.Info("Live: Message to stream without subscribers: %v", message.Stream) log.Info("Live: Message to stream without subscribers: %v", message.Stream)
continue continue
} }
messageBytes, _ := simplejson.NewFromAny(message).Encode() messageBytes, _ := simplejson.NewFromAny(message).Encode()
for sub := range subscribers { for sub := range subscribers {
// check if channel is open
if _, ok := h.connections[sub]; !ok {
delete(subscribers, sub)
continue
}
select { select {
case sub.send <- messageBytes: case sub.send <- messageBytes:
default: default:
......
...@@ -42,7 +42,7 @@ export class LiveSrv { ...@@ -42,7 +42,7 @@ export class LiveSrv {
}; };
this.conn.onmessage = (evt) => { this.conn.onmessage = (evt) => {
console.log("Live: message received:", evt.data); this.handleMessage(evt.data);
}; };
this.conn.onerror = (evt) => { this.conn.onerror = (evt) => {
...@@ -61,6 +61,23 @@ export class LiveSrv { ...@@ -61,6 +61,23 @@ export class LiveSrv {
return this.initPromise; return this.initPromise;
} }
handleMessage(message) {
message = JSON.parse(message);
if (!message.stream) {
console.log("Error: stream message without stream!", message);
return;
}
var observer = this.observers[message.stream];
if (!observer) {
this.removeObserver(message.stream, null);
return;
}
observer.next(message);
}
reconnect() { reconnect() {
// no need to reconnect if no one cares // no need to reconnect if no one cares
if (_.keys(this.observers).length === 0) { if (_.keys(this.observers).length === 0) {
...@@ -89,6 +106,7 @@ export class LiveSrv { ...@@ -89,6 +106,7 @@ export class LiveSrv {
} }
removeObserver(stream, observer) { removeObserver(stream, observer) {
console.log('unsubscribe', stream);
delete this.observers[stream]; delete this.observers[stream];
this.getConnection().then(conn => { this.getConnection().then(conn => {
......
...@@ -2,11 +2,13 @@ ...@@ -2,11 +2,13 @@
import {liveSrv} from 'app/core/core'; import {liveSrv} from 'app/core/core';
import {Observable} from 'vendor/npm/rxjs/Observable';
export class GrafanaStreamDS { export class GrafanaStreamDS {
subscription: any; subscription: any;
/** @ngInject */ /** @ngInject */
constructor(private $q) { constructor() {
} }
...@@ -16,11 +18,23 @@ export class GrafanaStreamDS { ...@@ -16,11 +18,23 @@ export class GrafanaStreamDS {
} }
var target = options.targets[0]; var target = options.targets[0];
if (this.subscription) {
if (this.subscription.stream !== target.stream) {
this.subscription.unsubscribe();
} else {
return Promise.resolve({data: []});
}
}
var observable = liveSrv.subscribe(target.stream); var observable = liveSrv.subscribe(target.stream);
this.subscription = observable.subscribe(data => { this.subscription = observable.subscribe(data => {
console.log("grafana stream ds data!", data); console.log("grafana stream ds data!", data);
}); });
this.subscription.stream = target.stream;
return Promise.resolve({data: []}); return Promise.resolve({data: []});
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment