Commit aa40320b by Ryan McKinley Committed by GitHub

Live: check logged-in-user for grafana live (#27436)

parent a587bf4f
......@@ -19,7 +19,7 @@ require (
github.com/beevik/etree v1.1.0 // indirect
github.com/benbjohnson/clock v0.0.0-20161215174838-7dc76406b6d3
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
github.com/centrifugal/centrifuge v0.10.0
github.com/centrifugal/centrifuge v0.11.0
github.com/crewjam/saml v0.0.0-20191031171751-c42136edf9b1
github.com/davecgh/go-spew v1.1.1
github.com/deepmap/oapi-codegen v1.3.11 // indirect
......
......@@ -124,26 +124,40 @@ func InitalizeBroker() (*GrafanaLive, error) {
})
b.Handler = func(ctx *models.ReqContext) {
// Put authentication Credentials into request Context. Since we don't
// have any session backend here we simply set user ID as empty string.
// Users with empty ID called anonymous users, in real app you should
// decide whether anonymous users allowed to connect to your server
// or not. There is also another way to set Credentials - returning them
// from ConnectingHandler which is called after client sent first command
// to server called Connect. See _examples folder in repo to find real-life
// auth samples (OAuth2, Gin sessions, JWT etc).
user := ctx.SignedInUser
if user == nil {
ctx.Resp.WriteHeader(401)
return
}
dto := models.UserProfileDTO{
Id: user.UserId,
Name: user.Name,
Email: user.Email,
Login: user.Login,
IsGrafanaAdmin: user.IsGrafanaAdmin,
OrgId: user.OrgId,
}
jsonData, err := json.Marshal(dto)
if err != nil {
logger.Debug("error reading user", "dto", dto)
ctx.Resp.WriteHeader(404)
return
}
logger.Info("Logged in user", "user", user)
cred := &centrifuge.Credentials{
UserID: "",
UserID: fmt.Sprintf("%d", user.UserId),
Info: jsonData,
}
newCtx := centrifuge.SetCredentials(ctx.Req.Context(), cred)
path := ctx.Req.URL.Path
logger.Debug("Handle", "path", path)
r := ctx.Req.Request
r = r.WithContext(newCtx) // Set a user ID
// Check if this is a direct websocket connection
path := ctx.Req.URL.Path
if strings.Contains(path, "live/ws") {
wsHandler.ServeHTTP(ctx.Resp, r)
return
......
......@@ -19,7 +19,6 @@ import { colors, JsonExplorer } from '@grafana/ui/';
import { infoPopover } from './components/info_popover';
import { arrayJoin } from './directives/array_join';
import { liveSrv } from './live/live_srv';
import { Emitter } from './utils/emitter';
import { switchDirective } from './components/switch';
import { dashboardSelector } from './components/dashboard_selector';
......@@ -46,7 +45,6 @@ export {
registerAngularDirectives,
arrayJoin,
coreModule,
liveSrv,
switchDirective,
infoPopover,
Emitter,
......
import _ from 'lodash';
import config from 'app/core/config';
import { Observable } from 'rxjs';
export class LiveSrv {
conn: any;
observers: any;
initPromise: any;
constructor() {
this.observers = {};
}
getWebSocketUrl() {
const l = window.location;
return (l.protocol === 'https:' ? 'wss://' : 'ws://') + l.host + config.appSubUrl + '/ws';
}
getConnection() {
if (this.initPromise) {
return this.initPromise;
}
if (this.conn && this.conn.readyState === 1) {
return Promise.resolve(this.conn);
}
this.initPromise = new Promise((resolve, reject) => {
this.conn = new WebSocket(this.getWebSocketUrl());
this.conn.onclose = (evt: any) => {
reject({ message: 'Connection closed' });
this.initPromise = null;
setTimeout(this.reconnect.bind(this), 2000);
};
this.conn.onmessage = (evt: any) => {
this.handleMessage(evt.data);
};
this.conn.onerror = (evt: any) => {
this.initPromise = null;
reject({ message: 'Connection error' });
};
this.conn.onopen = (evt: any) => {
this.initPromise = null;
resolve(this.conn);
};
});
return this.initPromise;
}
handleMessage(message: any) {
message = JSON.parse(message);
if (!message.stream) {
console.error('Error: stream message without stream!', message);
return;
}
const observer = this.observers[message.stream];
if (!observer) {
this.removeObserver(message.stream, null);
return;
}
observer.next(message);
}
reconnect() {
// no need to reconnect if no one cares
if (_.keys(this.observers).length === 0) {
return;
}
this.getConnection().then((conn: any) => {
_.each(this.observers, (value, key) => {
this.send({ action: 'subscribe', stream: key });
});
});
}
send(data: any) {
this.conn.send(JSON.stringify(data));
}
addObserver(stream: any, observer: any) {
this.observers[stream] = observer;
this.getConnection().then((conn: any) => {
this.send({ action: 'subscribe', stream: stream });
});
}
removeObserver(stream: any, observer: any) {
delete this.observers[stream];
this.getConnection().then((conn: any) => {
this.send({ action: 'unsubscribe', stream: stream });
});
}
subscribe(streamName: string) {
return Observable.create((observer: any) => {
this.addObserver(streamName, observer);
return () => {
this.removeObserver(streamName, observer);
};
});
// return this.init().then(() => {
// this.send({action: 'subscribe', stream: name});
// });
}
}
const instance = new LiveSrv();
export { instance as liveSrv };
......@@ -7,7 +7,7 @@ import Centrifuge, {
SubscribeErrorContext,
} from 'centrifuge/dist/centrifuge.protobuf';
import SockJS from 'sockjs-client';
import { GrafanaLiveSrv, setGrafanaLiveSrv, ChannelHandler } from '@grafana/runtime';
import { GrafanaLiveSrv, setGrafanaLiveSrv, ChannelHandler, config } from '@grafana/runtime';
import { Observable, Subject, BehaviorSubject } from 'rxjs';
import { KeyValue } from '@grafana/data';
......@@ -23,13 +23,10 @@ class CentrifugeSrv implements GrafanaLiveSrv {
standardCallbacks: SubscriptionEvents;
constructor() {
console.log('connecting....');
// TODO: better pick this from the URL
this.centrifuge = new Centrifuge(`http://${location.host}/live/sockjs`, {
this.centrifuge = new Centrifuge(`${config.appUrl}live/sockjs`, {
debug: true,
sockjs: SockJS,
});
this.centrifuge.setToken('ABCD');
this.centrifuge.connect(); // do connection
this.connectionState = new BehaviorSubject<boolean>(this.centrifuge.isConnected());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment