Commit 8a5fc003 by Ryan McKinley Committed by GitHub

Live: broadcast events when dashboard is saved (#27583)

Co-authored-by: kay delaney <45561153+kaydelaney@users.noreply.github.com>
Co-authored-by: Torkel Ödegaard <torkel@grafana.org>
parent 44c9aea2
......@@ -36,7 +36,7 @@ export interface LiveChannelConfig<TMessage = any> {
/**
* The channel keeps track of who else is connected to the same channel
*/
hasPresense?: boolean;
hasPresence?: boolean;
/**
* This method will be defined if it is possible to publish in this channel.
......@@ -61,10 +61,19 @@ export enum LiveChannelConnectionState {
Invalid = 'invalid',
}
export enum LiveChannelEventType {
Status = 'status',
Join = 'join',
Leave = 'leave',
Message = 'message',
}
/**
* @experimental
*/
export interface LiveChannelStatus {
export interface LiveChannelStatusEvent {
type: LiveChannelEventType.Status;
/**
* {scope}/{namespace}/{path}
*/
......@@ -85,28 +94,53 @@ export interface LiveChannelStatus {
/**
* The last error.
*
* This will remain in the status until a new message is succesfully recieved from the channel
* This will remain in the status until a new message is succesfully received from the channel
*/
error?: any;
}
/**
* @experimental
*/
export interface LiveChannelJoinLeave {
user: any;
export interface LiveChannelJoinEvent {
type: LiveChannelEventType.Join;
user: any; // @experimental -- will be filled in when we improve the UI
}
export interface LiveChannelLeaveEvent {
type: LiveChannelEventType.Leave;
user: any; // @experimental -- will be filled in when we improve the UI
}
export interface LiveChannelMessageEvent<T> {
type: LiveChannelEventType.Message;
message: T;
}
export type LiveChannelEvent<T = any> =
| LiveChannelStatusEvent
| LiveChannelJoinEvent
| LiveChannelLeaveEvent
| LiveChannelMessageEvent<T>;
export function isLiveChannelStatusEvent<T>(evt: LiveChannelEvent<T>): evt is LiveChannelStatusEvent {
return evt.type === LiveChannelEventType.Status;
}
export function isLiveChannelJoinEvent<T>(evt: LiveChannelEvent<T>): evt is LiveChannelJoinEvent {
return evt.type === LiveChannelEventType.Join;
}
export function isLiveChannelLeaveEvent<T>(evt: LiveChannelEvent<T>): evt is LiveChannelLeaveEvent {
return evt.type === LiveChannelEventType.Leave;
}
export function isLiveChannelMessageEvent<T>(evt: LiveChannelEvent<T>): evt is LiveChannelMessageEvent<T> {
return evt.type === LiveChannelEventType.Message;
}
/**
* @experimental
*/
export interface LiveChannelPresense {
users: any;
}
export interface LiveChannelMessage<TMessage = any> {
type: 'status' | 'message' | 'join' | 'leave';
message: TMessage | LiveChannelStatus | LiveChannelJoinLeave;
export interface LiveChannelPresenceStatus {
users: any; // @experimental -- will be filled in when we improve the UI
}
/**
......@@ -134,14 +168,14 @@ export interface LiveChannel<TMessage = any, TPublish = any> {
/**
* Watch all events in this channel
*/
getStream: () => Observable<LiveChannelMessage<TMessage>>;
getStream: () => Observable<LiveChannelEvent<TMessage>>;
/**
* For channels that support presense, this will request the current state from the server.
* For channels that support presence, this will request the current state from the server.
*
* Join and leave messages will be sent to the open stream
*/
getPresense?: () => Promise<LiveChannelPresense>;
getPresence?: () => Promise<LiveChannelPresenceStatus>;
/**
* Write a message into the channel
......
......@@ -262,6 +262,17 @@ func (hs *HTTPServer) PostDashboard(c *models.ReqContext, cmd models.SaveDashboa
}
}
// Tell everyone listening that the dashboard changed
if hs.Live != nil {
err := hs.Live.GrafanaScope.Dashboards.DashboardSaved(
dashboard.Uid,
c.UserId,
)
if err != nil {
hs.log.Warn("unable to broadcast save event", "uid", dashboard.Uid, "error", err)
}
}
c.TimeRequest(metrics.MApiDashboardSave)
return JSON(200, util.DynMap{
"status": "success",
......
......@@ -79,15 +79,11 @@ func (hs *HTTPServer) Init() error {
// Set up a websocket broker
if hs.Cfg.IsLiveEnabled() { // feature flag
node, err := live.InitalizeBroker()
node, err := live.InitializeBroker()
if err != nil {
return err
}
hs.Live = node
// Spit random walk to example
go live.RunRandomCSV(hs.Live, "grafana/testdata/random-2s-stream", 2000, 0)
go live.RunRandomCSV(hs.Live, "grafana/testdata/random-flakey-stream", 400, .6)
}
hs.macaron = hs.newMacaron()
......
package models
import "github.com/centrifugal/centrifuge"
// ChannelPublisher writes data into a channel
type ChannelPublisher func(channel string, data []byte) error
// ChannelHandler defines the core channel behavior
type ChannelHandler interface {
// This is called fast and often -- it must be synchrnozed
GetChannelOptions(id string) centrifuge.ChannelOptions
// Called when a client wants to subscribe to a channel
OnSubscribe(c *centrifuge.Client, e centrifuge.SubscribeEvent) error
// Called when something writes into the channel. The returned value will be broadcast if len() > 0
OnPublish(c *centrifuge.Client, e centrifuge.PublishEvent) ([]byte, error)
}
// ChannelHandlerProvider -- this should be implemented by any core feature
type ChannelHandlerProvider interface {
// This is called fast and often -- it must be synchrnozed
GetHandlerForPath(path string) (ChannelHandler, error)
}
// DashboardActivityChannel is a service to advertise dashboard activity
type DashboardActivityChannel interface {
DashboardSaved(uid string, userID int64) error
DashboardDeleted(uid string, userID int64) error
}
package live
import (
"fmt"
"strings"
)
// ChannelIdentifier is the channel id split by parts
type ChannelIdentifier struct {
Scope string // grafana, ds, or plugin
Namespace string // feature, id, or name
Path string // path within the channel handler
}
// ParseChannelIdentifier parses the parts from a channel id:
// ${scope} / ${namespace} / ${path}
func ParseChannelIdentifier(id string) (ChannelIdentifier, error) {
parts := strings.SplitN(id, "/", 3)
if len(parts) == 3 {
return ChannelIdentifier{
Scope: parts[0],
Namespace: parts[1],
Path: parts[2],
}, nil
}
return ChannelIdentifier{}, fmt.Errorf("Invalid channel id: %s", id)
}
package live
import (
"testing"
"github.com/google/go-cmp/cmp"
)
func TestParseChannelIdentifier(t *testing.T) {
ident, err := ParseChannelIdentifier("aaa/bbb/ccc/ddd")
if err != nil {
t.FailNow()
}
ex := ChannelIdentifier{
Scope: "aaa",
Namespace: "bbb",
Path: "ccc/ddd",
}
if diff := cmp.Diff(ident, ex); diff != "" {
t.Fatalf("Result mismatch (-want +got):\n%s", diff)
}
// Check an invalid identifier
_, err = ParseChannelIdentifier("aaa/bbb")
if err == nil {
t.FailNow()
}
}
package live
import (
"encoding/json"
"math/rand"
"time"
)
// channelInfo holds metadata about each channel and is returned on connection.
// Eventually each plugin should control exactly what is in this structure.
type channelInfo struct {
Description string
}
type randomWalkMessage struct {
Time int64
Value float64
Min float64
Max float64
}
// RunRandomCSV just for an example
func RunRandomCSV(broker *GrafanaLive, channel string, speedMillis int, dropPercent float64) {
spread := 50.0
walker := rand.Float64() * 100
ticker := time.NewTicker(time.Duration(speedMillis) * time.Millisecond)
line := randomWalkMessage{}
for t := range ticker.C {
if rand.Float64() <= dropPercent {
continue //
}
delta := rand.Float64() - 0.5
walker += delta
line.Time = t.UnixNano() / int64(time.Millisecond)
line.Value = walker
line.Min = walker - ((rand.Float64() * spread) + 0.01)
line.Max = walker + ((rand.Float64() * spread) + 0.01)
bytes, err := json.Marshal(&line)
if err != nil {
logger.Warn("unable to marshal line", "error", err)
continue
}
v := broker.Publish(channel, bytes)
if !v {
logger.Warn("write", "channel", channel, "line", line, "ok", v)
}
}
}
package features
import (
"github.com/centrifugal/centrifuge"
"github.com/grafana/grafana/pkg/models"
)
// BroadcastRunner will simply broadcast all events to `grafana/broadcast/*` channels
// This makes no assumptions about the shape of the data and will broadcast it to anyone listening
type BroadcastRunner struct{}
// GetHandlerForPath called on init
func (g *BroadcastRunner) GetHandlerForPath(path string) (models.ChannelHandler, error) {
return g, nil // for now all channels share config
}
// GetChannelOptions called fast and often
func (g *BroadcastRunner) GetChannelOptions(id string) centrifuge.ChannelOptions {
return centrifuge.ChannelOptions{}
}
// OnSubscribe for now allows anyone to subscribe to any dashboard
func (g *BroadcastRunner) OnSubscribe(c *centrifuge.Client, e centrifuge.SubscribeEvent) error {
// anyone can subscribe
return nil
}
// OnPublish called when an event is received from the websocket
func (g *BroadcastRunner) OnPublish(c *centrifuge.Client, e centrifuge.PublishEvent) ([]byte, error) {
// expect the data to be the right shape?
return e.Data, nil
}
package features
import (
"encoding/json"
"github.com/centrifugal/centrifuge"
"github.com/grafana/grafana/pkg/models"
)
// DashboardEvent events related to dashboards
type dashboardEvent struct {
UID string `json:"uid"`
Action string `json:"action"` // saved, editing
UserID int64 `json:"userId,omitempty"`
SessionID string `json:"sessionId,omitempty"`
}
// DashboardHandler manages all the `grafana/dashboard/*` channels
type DashboardHandler struct {
publisher models.ChannelPublisher
}
// CreateDashboardHandler Initialize a dashboard handler
func CreateDashboardHandler(p models.ChannelPublisher) DashboardHandler {
return DashboardHandler{
publisher: p,
}
}
// GetHandlerForPath called on init
func (g *DashboardHandler) GetHandlerForPath(path string) (models.ChannelHandler, error) {
return g, nil // all dashboards share the same handler
}
// GetChannelOptions called fast and often
func (g *DashboardHandler) GetChannelOptions(id string) centrifuge.ChannelOptions {
return centrifuge.ChannelOptions{
Presence: true,
JoinLeave: true, // if enterprise?
}
}
// OnSubscribe for now allows anyone to subscribe to any dashboard
func (g *DashboardHandler) OnSubscribe(c *centrifuge.Client, e centrifuge.SubscribeEvent) error {
// TODO? check authentication
return nil
}
// OnPublish called when an event is received from the websocket
func (g *DashboardHandler) OnPublish(c *centrifuge.Client, e centrifuge.PublishEvent) ([]byte, error) {
// TODO -- verify and keep track of editors?
return e.Data, nil
}
// DashboardSaved should broadcast to the appropriate stream
func (g *DashboardHandler) publish(event dashboardEvent) error {
msg, err := json.Marshal(event)
if err != nil {
return err
}
return g.publisher("grafana/dashboard/"+event.UID, msg)
}
// DashboardSaved will broadcast to all connected dashboards
func (g *DashboardHandler) DashboardSaved(uid string, userID int64) error {
return g.publish(dashboardEvent{
UID: uid,
Action: "saved",
UserID: userID,
})
}
// DashboardDeleted will broadcast to all connected dashboards
func (g *DashboardHandler) DashboardDeleted(uid string, userID int64) error {
return g.publish(dashboardEvent{
UID: uid,
Action: "deleted",
UserID: userID,
})
}
package features
import (
"encoding/json"
"fmt"
"math/rand"
"time"
"github.com/centrifugal/centrifuge"
"github.com/grafana/grafana/pkg/cmd/grafana-cli/logger"
"github.com/grafana/grafana/pkg/models"
)
// TestdataRunner manages all the `grafana/dashboard/*` channels
type testdataRunner struct {
publisher models.ChannelPublisher
running bool
speedMillis int
dropPercent float64
channel string
}
// TestdataSupplier manages all the `grafana/testdata/*` channels
type TestdataSupplier struct {
publisher models.ChannelPublisher
}
// CreateTestdataSupplier Initialize a dashboard handler
func CreateTestdataSupplier(p models.ChannelPublisher) TestdataSupplier {
return TestdataSupplier{
publisher: p,
}
}
// GetHandlerForPath called on init
func (g *TestdataSupplier) GetHandlerForPath(path string) (models.ChannelHandler, error) {
channel := "grafana/testdata/" + path
if path == "random-2s-stream" {
return &testdataRunner{
publisher: g.publisher,
running: false,
speedMillis: 2000,
dropPercent: 0,
channel: channel,
}, nil
}
if path == "random-flakey-stream" {
return &testdataRunner{
publisher: g.publisher,
running: false,
speedMillis: 400,
dropPercent: .6,
channel: channel,
}, nil
}
return nil, fmt.Errorf("unknown channel")
}
// GetChannelOptions called fast and often
func (g *testdataRunner) GetChannelOptions(id string) centrifuge.ChannelOptions {
return centrifuge.ChannelOptions{}
}
// OnSubscribe for now allows anyone to subscribe to any dashboard
func (g *testdataRunner) OnSubscribe(c *centrifuge.Client, e centrifuge.SubscribeEvent) error {
if !g.running {
g.running = true
// Run in the background
go g.runRandomCSV()
}
// TODO? check authentication
return nil
}
// OnPublish called when an event is received from the websocket
func (g *testdataRunner) OnPublish(c *centrifuge.Client, e centrifuge.PublishEvent) ([]byte, error) {
return nil, fmt.Errorf("can not publish to testdata")
}
type randomWalkMessage struct {
Time int64
Value float64
Min float64
Max float64
}
// RunRandomCSV just for an example
func (g *testdataRunner) runRandomCSV() {
spread := 50.0
walker := rand.Float64() * 100
ticker := time.NewTicker(time.Duration(g.speedMillis) * time.Millisecond)
line := randomWalkMessage{}
for t := range ticker.C {
if rand.Float64() <= g.dropPercent {
continue
}
delta := rand.Float64() - 0.5
walker += delta
line.Time = t.UnixNano() / int64(time.Millisecond)
line.Value = walker
line.Min = walker - ((rand.Float64() * spread) + 0.01)
line.Max = walker + ((rand.Float64() * spread) + 0.01)
bytes, err := json.Marshal(&line)
if err != nil {
logger.Warn("unable to marshal line", "error", err)
continue
}
err = g.publisher(g.channel, bytes)
if err != nil {
logger.Warn("write", "channel", g.channel, "line", line)
}
}
}
......@@ -4,10 +4,13 @@ import (
"encoding/json"
"fmt"
"strings"
"sync"
"github.com/centrifugal/centrifuge"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/services/live/features"
)
var (
......@@ -15,14 +18,39 @@ var (
loggerCF = log.New("live.centrifuge")
)
// CoreGrafanaScope list of core features
type CoreGrafanaScope struct {
Features map[string]models.ChannelHandlerProvider
// The generic service to advertise dashboard changes
Dashboards models.DashboardActivityChannel
}
// GrafanaLive pretends to be the server
type GrafanaLive struct {
node *centrifuge.Node
Handler interface{} // handler func
node *centrifuge.Node
// The websocket handler
Handler interface{}
// Full channel handler
channels map[string]models.ChannelHandler
channelsMu sync.RWMutex
// The core internal features
GrafanaScope CoreGrafanaScope
}
// InitalizeBroker initializes the broker and starts listening for requests.
func InitalizeBroker() (*GrafanaLive, error) {
// InitializeBroker initializes the broker and starts listening for requests.
func InitializeBroker() (*GrafanaLive, error) {
glive := &GrafanaLive{
channels: make(map[string]models.ChannelHandler),
channelsMu: sync.RWMutex{},
GrafanaScope: CoreGrafanaScope{
Features: make(map[string]models.ChannelHandlerProvider),
},
}
// We use default config here as starting point. Default config contains
// reasonable values for available options.
cfg := centrifuge.DefaultConfig
......@@ -30,6 +58,20 @@ func InitalizeBroker() (*GrafanaLive, error) {
// cfg.LogLevel = centrifuge.LogLevelDebug
cfg.LogHandler = handleLog
// This function is called fast and often -- it must be sychronized
cfg.ChannelOptionsFunc = func(channel string) (centrifuge.ChannelOptions, bool, error) {
handler, err := glive.GetChannelHandler(channel)
if err != nil {
logger.Error("ChannelOptionsFunc", "channel", channel, "err", err)
if err.Error() == "404" { // ????
return centrifuge.ChannelOptions{}, false, nil
}
return centrifuge.ChannelOptions{}, true, err
}
opts := handler.GetChannelOptions(channel)
return opts, true, nil
}
// Node is the core object in Centrifuge library responsible for many useful
// things. For example Node allows to publish messages to channels from server
// side with its Publish method, but in this example we will publish messages
......@@ -38,10 +80,16 @@ func InitalizeBroker() (*GrafanaLive, error) {
if err != nil {
return nil, err
}
glive.node = node
b := &GrafanaLive{
node: node,
}
// Initialize the main features
dash := features.CreateDashboardHandler(glive.Publish)
tds := features.CreateTestdataSupplier(glive.Publish)
glive.GrafanaScope.Dashboards = &dash
glive.GrafanaScope.Features["dashboard"] = &dash
glive.GrafanaScope.Features["testdata"] = &tds
glive.GrafanaScope.Features["broadcast"] = &features.BroadcastRunner{}
// Set ConnectHandler called when client successfully connected to Node. Your code
// inside handler must be synchronized since it will be called concurrently from
......@@ -56,54 +104,54 @@ func InitalizeBroker() (*GrafanaLive, error) {
logger.Debug("client connected", "transport", transportName, "encoding", transportEncoding)
})
// Set Disconnect handler to react on client disconnect events.
node.OnDisconnect(func(c *centrifuge.Client, e centrifuge.DisconnectEvent) {
logger.Info("client disconnected")
})
// Set SubscribeHandler to react on every channel subscription attempt
// initiated by client. Here you can theoretically return an error or
// disconnect client from server if needed. But now we just accept
// all subscriptions to all channels. In real life you may use a more
// complex permission check here.
node.OnSubscribe(func(c *centrifuge.Client, e centrifuge.SubscribeEvent) (centrifuge.SubscribeReply, error) {
info := &channelInfo{
Description: fmt.Sprintf("channel: %s", e.Channel),
reply := centrifuge.SubscribeReply{}
handler, err := glive.GetChannelHandler(e.Channel)
if err != nil {
return reply, err
}
bytes, err := json.Marshal(&info)
err = handler.OnSubscribe(c, e)
if err != nil {
return centrifuge.SubscribeReply{}, err
return reply, err
}
logger.Debug("client subscribes on channel", "channel", e.Channel, "info", string(bytes))
return centrifuge.SubscribeReply{
ExpireAt: 0, // does not expire
ChannelInfo: bytes,
}, nil
return reply, nil
})
node.OnUnsubscribe(func(c *centrifuge.Client, e centrifuge.UnsubscribeEvent) {
s, err := node.PresenceStats(e.Channel)
if err != nil {
logger.Warn("unable to get presence stats", "channel", e.Channel, "error", err)
}
logger.Debug("unsubscribe from channel", "channel", e.Channel, "clients", s.NumClients, "users", s.NumUsers)
logger.Debug("unsubscribe from channel", "channel", e.Channel, "user", c.UserID())
})
// By default, clients can not publish messages into channels. By setting
// PublishHandler we tell Centrifuge that publish from client side is possible.
// Now each time client calls publish method this handler will be called and
// you have a possibility to validate publication request before message will
// be published into channel and reach active subscribers. In our simple chat
// app we allow everyone to publish into any channel.
// Called when something is written to the websocket
node.OnPublish(func(c *centrifuge.Client, e centrifuge.PublishEvent) (centrifuge.PublishReply, error) {
// logger.Debug("client publishes into channel", "channel", e.Channel, "body", string(e.Data))
reply := centrifuge.PublishReply{}
handler, err := glive.GetChannelHandler(e.Channel)
if err != nil {
return reply, err
}
// For now, broadcast any messages to everyone
_, err := node.Publish(e.Channel, e.Data)
data, err := handler.OnPublish(c, e)
if err != nil {
return reply, err
}
if len(data) > 0 {
_, err = node.Publish(e.Channel, e.Data)
}
return centrifuge.PublishReply{}, err // returns an error if it could not publish
})
// Set Disconnect handler to react on client disconnect events.
node.OnDisconnect(func(c *centrifuge.Client, e centrifuge.DisconnectEvent) {
logger.Info("client disconnected")
})
// Run node. This method does not block.
if err := node.Run(); err != nil {
return nil, err
......@@ -123,7 +171,7 @@ func InitalizeBroker() (*GrafanaLive, error) {
WriteBufferSize: 1024,
})
b.Handler = func(ctx *models.ReqContext) {
glive.Handler = func(ctx *models.ReqContext) {
user := ctx.SignedInUser
if user == nil {
ctx.Resp.WriteHeader(401)
......@@ -171,16 +219,71 @@ func InitalizeBroker() (*GrafanaLive, error) {
// Unknown path
ctx.Resp.WriteHeader(404)
}
return b, nil
return glive, nil
}
// Publish sends the data to the channel
func (b *GrafanaLive) Publish(channel string, data []byte) bool {
_, err := b.node.Publish(channel, data)
// GetChannelHandler gives threadsafe access to the channel
func (g *GrafanaLive) GetChannelHandler(channel string) (models.ChannelHandler, error) {
g.channelsMu.RLock()
c, ok := g.channels[channel]
g.channelsMu.RUnlock() // defer? but then you can't lock further down
if ok {
return c, nil
}
// Parse the identifier ${scope}/${namespace}/${path}
id, err := ParseChannelIdentifier(channel)
if err != nil {
logger.Warn("error writing to channel", "channel", channel, "err", err)
return nil, err
}
logger.Info("initChannel", "channel", channel, "id", id)
g.channelsMu.Lock()
defer g.channelsMu.Unlock()
c, ok = g.channels[channel] // may have filled in while locked
if ok {
return c, nil
}
return err == nil
c, err = g.initChannel(id)
if err != nil {
return nil, err
}
g.channels[channel] = c
return c, nil
}
func (g *GrafanaLive) initChannel(id ChannelIdentifier) (models.ChannelHandler, error) {
if id.Scope == "grafana" {
p, ok := g.GrafanaScope.Features[id.Namespace]
if ok {
return p.GetHandlerForPath(id.Path)
}
return nil, fmt.Errorf("Unknown feature: %s", id.Namespace)
}
if id.Scope == "ds" {
return nil, fmt.Errorf("todo... look up datasource: %s", id.Namespace)
}
if id.Scope == "plugin" {
p, ok := plugins.Plugins[id.Namespace]
if ok {
h := &PluginHandler{
Plugin: p,
}
return h.GetHandlerForPath(id.Path)
}
return nil, fmt.Errorf("unknown plugin: %s", id.Namespace)
}
return nil, fmt.Errorf("invalid scope: %s", id.Scope)
}
// Publish sends the data to the channel without checking permissions etc
func (g *GrafanaLive) Publish(channel string, data []byte) error {
_, err := g.node.Publish(channel, data)
return err
}
// Write to the standard log15 logger
......
package live
import (
"github.com/centrifugal/centrifuge"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/plugins"
)
// PluginHandler manages all the `grafana/dashboard/*` channels
type PluginHandler struct {
Plugin *plugins.PluginBase
}
// GetHandlerForPath called on init
func (g *PluginHandler) GetHandlerForPath(path string) (models.ChannelHandler, error) {
return g, nil // all dashboards share the same handler
}
// GetChannelOptions called fast and often
func (g *PluginHandler) GetChannelOptions(id string) centrifuge.ChannelOptions {
return centrifuge.ChannelOptions{}
}
// OnSubscribe for now allows anyone to subscribe to any dashboard
func (g *PluginHandler) OnSubscribe(c *centrifuge.Client, e centrifuge.SubscribeEvent) error {
return nil // anyone can subscribe
}
// OnPublish called when an event is received from the websocket
func (g *PluginHandler) OnPublish(c *centrifuge.Client, e centrifuge.PublishEvent) ([]byte, error) {
return e.Data, nil // broadcast any event
}
......@@ -3,6 +3,7 @@ import { shallow } from 'enzyme';
import { FolderPicker } from './FolderPicker';
jest.mock('@grafana/runtime', () => ({
...((jest.requireActual('@grafana/runtime') as unknown) as object),
getBackendSrv: () => ({
search: jest.fn(() => [
{ title: 'Dash 1', id: 'A' },
......
......@@ -169,13 +169,21 @@ export class LiveAdmin extends PureComponent<Props, State> {
<h5>Namespace</h5>
<Select
options={namespaces}
value={namespaces.find(s => s.value === namespace) || ''}
value={namespaces.find(s => s.value === namespace) || namespace || ''}
onChange={this.onNamespaceChanged}
allowCustomValue={true}
backspaceRemovesValue={true}
/>
</div>
<div>
<h5>Path</h5>
<Select options={paths} value={paths.find(s => s.value === path) || ''} onChange={this.onPathChanged} />
<Select
options={paths}
value={paths.find(s => s.value === path) || path || ''}
onChange={this.onPathChanged}
allowCustomValue={true}
backspaceRemovesValue={true}
/>
</div>
</div>
<br />
......
......@@ -3,12 +3,14 @@ import { Unsubscribable, PartialObserver } from 'rxjs';
import { getGrafanaLiveSrv } from '@grafana/runtime';
import {
AppEvents,
isLiveChannelStatusEvent,
LiveChannel,
LiveChannelConfig,
LiveChannelConnectionState,
LiveChannelMessage,
LiveChannelEvent,
LiveChannelEventType,
LiveChannelScope,
LiveChannelStatus,
LiveChannelStatusEvent,
} from '@grafana/data';
import { Input, Button } from '@grafana/ui';
import { appEvents } from 'app/core/core';
......@@ -22,7 +24,7 @@ interface Props {
interface State {
channel?: LiveChannel;
status: LiveChannelStatus;
status: LiveChannelStatusEvent;
count: number;
lastTime: number;
lastBody: string;
......@@ -31,7 +33,12 @@ interface State {
export class LivePanel extends PureComponent<Props, State> {
state: State = {
status: { id: '?', state: LiveChannelConnectionState.Pending, timestamp: Date.now() },
status: {
type: LiveChannelEventType.Status,
id: '?',
state: LiveChannelConnectionState.Pending,
timestamp: Date.now(),
},
count: 0,
lastTime: 0,
lastBody: '',
......@@ -39,15 +46,15 @@ export class LivePanel extends PureComponent<Props, State> {
};
subscription?: Unsubscribable;
streamObserver: PartialObserver<LiveChannelMessage> = {
next: (msg: LiveChannelMessage) => {
if (msg.type === 'status') {
this.setState({ status: msg.message as LiveChannelStatus });
streamObserver: PartialObserver<LiveChannelEvent> = {
next: (event: LiveChannelEvent) => {
if (isLiveChannelStatusEvent(event)) {
this.setState({ status: event });
} else {
this.setState({
count: this.state.count + 1,
lastTime: Date.now(),
lastBody: JSON.stringify(msg),
lastBody: JSON.stringify(event),
});
}
},
......
......@@ -5,6 +5,7 @@ import { DashboardModel } from 'app/features/dashboard/state';
import { act } from 'react-dom/test-utils';
jest.mock('@grafana/runtime', () => ({
...((jest.requireActual('@grafana/runtime') as unknown) as object),
getBackendSrv: () => ({ get: jest.fn().mockResolvedValue([]), search: jest.fn().mockResolvedValue([]) }),
}));
......
......@@ -32,6 +32,7 @@ import { PanelInspector } from '../components/Inspector/PanelInspector';
import { SubMenu } from '../components/SubMenu/SubMenu';
import { cleanUpDashboardAndVariables } from '../state/actions';
import { cancelVariables } from '../../variables/state/actions';
import { dashboardWatcher } from 'app/features/live/dashboard/dashboardWatcher';
export interface Props {
urlUid?: string;
......@@ -116,6 +117,8 @@ export class DashboardPage extends PureComponent<Props, State> {
// entering edit mode
if (!editPanel && urlEditPanelId) {
dashboardWatcher.setEditingState(true);
this.getPanelByIdFromUrlParam(urlEditPanelId, panel => {
// if no edit permission show error
if (!dashboard.canEditPanel(panel)) {
......@@ -129,6 +132,8 @@ export class DashboardPage extends PureComponent<Props, State> {
// leaving edit mode
if (editPanel && !urlEditPanelId) {
dashboardWatcher.setEditingState(false);
this.setState({ editPanel: null });
}
......
......@@ -29,6 +29,7 @@ import { DashboardModel } from './DashboardModel';
import { DataQuery, locationUtil } from '@grafana/data';
import { initVariablesTransaction } from '../../variables/state/actions';
import { emitDashboardViewEvent } from './analyticsProcessor';
import { dashboardWatcher } from 'app/features/live/dashboard/dashboardWatcher';
export interface InitDashboardArgs {
$injector: any;
......@@ -230,6 +231,11 @@ export function initDashboard(args: InitDashboardArgs): ThunkResult<void> {
// send open dashboard event
if (args.routeInfo !== DashboardRouteInfo.New) {
emitDashboardViewEvent(dashboard);
// Listen for changes on the current dashboard
dashboardWatcher.watch(dashboard.uid);
} else {
dashboardWatcher.leave();
}
// yay we are done
......
......@@ -2,11 +2,11 @@ import {
LiveChannelConfig,
LiveChannel,
LiveChannelScope,
LiveChannelStatus,
LiveChannelPresense,
LiveChannelJoinLeave,
LiveChannelMessage,
LiveChannelStatusEvent,
LiveChannelEvent,
LiveChannelEventType,
LiveChannelConnectionState,
LiveChannelPresenceStatus,
} from '@grafana/data';
import Centrifuge, {
JoinLeaveContext,
......@@ -22,7 +22,7 @@ import { Subject, of, merge } from 'rxjs';
* Internal class that maps Centrifuge support to GrafanaLive
*/
export class CentrifugeLiveChannel<TMessage = any, TPublish = any> implements LiveChannel<TMessage, TPublish> {
readonly currentStatus: LiveChannelStatus;
readonly currentStatus: LiveChannelStatusEvent;
readonly opened = Date.now();
readonly id: string;
......@@ -30,10 +30,7 @@ export class CentrifugeLiveChannel<TMessage = any, TPublish = any> implements Li
readonly namespace: string;
readonly path: string;
readonly stream = new Subject<LiveChannelMessage<TMessage>>();
// When presense is enabled (rarely), this will be initalized
private presense?: Subject<LiveChannelPresense>;
readonly stream = new Subject<LiveChannelEvent<TMessage>>();
/** Static definition of the channel definition. This may describe the channel usage */
config?: LiveChannelConfig;
......@@ -46,6 +43,7 @@ export class CentrifugeLiveChannel<TMessage = any, TPublish = any> implements Li
this.namespace = namespace;
this.path = path;
this.currentStatus = {
type: LiveChannelEventType.Status,
id,
timestamp: this.opened,
state: LiveChannelConnectionState.Pending,
......@@ -61,9 +59,12 @@ export class CentrifugeLiveChannel<TMessage = any, TPublish = any> implements Li
const prepare = config.processMessage ? config.processMessage : (v: any) => v;
const events: SubscriptionEvents = {
// This means a message was recieved from the server
// This means a message was received from the server
publish: (ctx: PublicationContext) => {
this.stream.next(prepare(ctx.data));
this.stream.next({
type: LiveChannelEventType.Message,
message: prepare(ctx.data),
});
// Clear any error messages
if (this.currentStatus.error) {
......@@ -89,27 +90,15 @@ export class CentrifugeLiveChannel<TMessage = any, TPublish = any> implements Li
},
};
if (config.hasPresense) {
if (config.hasPresence) {
events.join = (ctx: JoinLeaveContext) => {
const message: LiveChannelJoinLeave = {
user: ctx.info.user,
};
this.stream.next({
type: 'join',
message,
});
this.stream.next({ type: LiveChannelEventType.Join, user: ctx.info.user });
};
events.leave = (ctx: JoinLeaveContext) => {
const message: LiveChannelJoinLeave = {
user: ctx.info.user,
};
this.stream.next({
type: 'leave',
message,
});
this.stream.next({ type: LiveChannelEventType.Leave, user: ctx.info.user });
};
this.getPresense = () => {
this.getPresence = () => {
return this.subscription!.presence().then(v => {
return {
users: Object.keys(v.presence),
......@@ -121,21 +110,20 @@ export class CentrifugeLiveChannel<TMessage = any, TPublish = any> implements Li
}
private sendStatus() {
this.stream.next({ type: 'status', message: { ...this.currentStatus } });
this.stream.next({ ...this.currentStatus });
}
/**
* Get the stream of events and
*/
getStream() {
const status: LiveChannelMessage<TMessage> = { type: 'status', message: { ...this.currentStatus } };
return merge(of(status), this.stream.asObservable());
return merge(of({ ...this.currentStatus }), this.stream.asObservable());
}
/**
* This is configured by the server when the config supports presense
* This is configured by the server when the config supports presence
*/
getPresense?: () => Promise<LiveChannelPresense>;
getPresence?: () => Promise<LiveChannelPresenceStatus>;
/**
* This is configured by the server when config supports writing
......@@ -157,11 +145,7 @@ export class CentrifugeLiveChannel<TMessage = any, TPublish = any> implements Li
this.stream.complete();
if (this.presense) {
this.presense.complete();
}
this.stream.next({ type: 'status', message: { ...this.currentStatus } });
this.stream.next({ ...this.currentStatus });
this.stream.complete();
if (this.shutdownCallback) {
......@@ -182,13 +166,6 @@ export function getErrorChannel(
namespace: string,
path: string
): LiveChannel {
const errorStatus: LiveChannelStatus = {
id,
timestamp: Date.now(),
state: LiveChannelConnectionState.Invalid,
error: msg,
};
return {
id,
opened: Date.now(),
......@@ -199,8 +176,11 @@ export function getErrorChannel(
// return an error
getStream: () =>
of({
type: 'status',
message: errorStatus,
type: LiveChannelEventType.Status,
id,
timestamp: Date.now(),
state: LiveChannelConnectionState.Invalid,
error: msg,
}),
// already disconnected
......
import React, { PureComponent } from 'react';
import { Modal, stylesFactory, VerticalGroup } from '@grafana/ui';
import { css } from 'emotion';
import { dashboardWatcher } from './dashboardWatcher';
import { config } from '@grafana/runtime';
import { DashboardEvent, DashboardEventAction } from './types';
import { GrafanaTheme } from '@grafana/data';
interface Props {
event?: DashboardEvent;
}
interface State {
dismiss?: boolean;
}
interface ActionInfo {
label: string;
description: string;
action: () => void;
}
export class DashboardChangedModal extends PureComponent<Props, State> {
state: State = {};
discardAndReload: ActionInfo = {
label: 'Discard local changes',
description: 'Load the latest saved version for this dashboard',
action: () => {
dashboardWatcher.reloadPage();
this.onDismiss();
},
};
continueEditing: ActionInfo = {
label: 'Continue editing',
description:
'Keep your local changes and continue editing. Note: when you save, this will overwrite the most recent chages',
action: () => {
this.onDismiss();
},
};
acceptDelete: ActionInfo = {
label: 'Discard Local changes',
description: 'view grafana homepage',
action: () => {
// Navigate to the root URL
document.location.href = config.appUrl;
},
};
onDismiss = () => {
this.setState({ dismiss: true });
};
render() {
const { event } = this.props;
const { dismiss } = this.state;
const styles = getStyles(config.theme);
const isDelete = event?.action === DashboardEventAction.Deleted;
const options = isDelete
? [this.continueEditing, this.acceptDelete]
: [this.continueEditing, this.discardAndReload];
return (
<Modal
isOpen={!dismiss}
title="Dashboard Changed"
icon="copy"
onDismiss={this.onDismiss}
className={styles.modal}
>
<div>
{isDelete ? (
<div>This dashboard has been deleted by another session</div>
) : (
<div>This dashboard has been modifed by another session</div>
)}
<br />
<VerticalGroup>
{options.map(opt => {
return (
<div key={opt.label} onClick={opt.action} className={styles.radioItem}>
<h3>{opt.label}</h3>
{opt.description}
</div>
);
})}
</VerticalGroup>
<br />
</div>
</Modal>
);
}
}
const getStyles = stylesFactory((theme: GrafanaTheme) => {
return {
modal: css`
width: 500px;
`,
radioItem: css`
margin: 0;
margin-left: ${theme.spacing.md};
font-size: ${theme.typography.size.sm};
color: ${theme.colors.textWeak};
`,
};
});
import { getGrafanaLiveSrv, getLegacyAngularInjector } from '@grafana/runtime';
import { getDashboardSrv } from '../../dashboard/services/DashboardSrv';
import { appEvents } from 'app/core/core';
import {
AppEvents,
LiveChannel,
LiveChannelScope,
LiveChannelEvent,
LiveChannelConfig,
LiveChannelConnectionState,
isLiveChannelStatusEvent,
isLiveChannelMessageEvent,
} from '@grafana/data';
import { CoreEvents } from 'app/types';
import { DashboardChangedModal } from './DashboardChangedModal';
import { DashboardEvent, DashboardEventAction } from './types';
import { CoreGrafanaLiveFeature } from '../scopes';
import { sessionId } from '../live';
class DashboardWatcher {
channel?: LiveChannel<DashboardEvent>;
uid?: string;
ignoreSave?: boolean;
editing = false;
setEditingState(state: boolean) {
const changed = (this.editing = state);
this.editing = state;
if (changed) {
this.sendEditingState();
}
}
private sendEditingState() {
if (!this.channel?.publish) {
return;
}
const msg: DashboardEvent = {
sessionId,
uid: this.uid!,
action: this.editing ? DashboardEventAction.EditingStarted : DashboardEventAction.EditingCanceled,
message: 'user (name)',
};
this.channel!.publish!(msg);
}
watch(uid: string) {
const live = getGrafanaLiveSrv();
if (!live) {
return;
}
// Check for changes
if (uid !== this.uid) {
this.leave();
this.channel = live.getChannel(LiveChannelScope.Grafana, 'dashboard', uid);
this.channel.getStream().subscribe(this.observer);
this.uid = uid;
}
console.log('Watch', uid);
}
leave() {
if (this.channel) {
this.channel.disconnect();
}
this.uid = undefined;
}
ignoreNextSave() {
this.ignoreSave = true;
}
observer = {
next: (event: LiveChannelEvent<DashboardEvent>) => {
// Send the editing state when connection starts
if (isLiveChannelStatusEvent(event) && this.editing && event.state === LiveChannelConnectionState.Connected) {
this.sendEditingState();
}
if (isLiveChannelMessageEvent(event)) {
if (event.message.sessionId === sessionId) {
return; // skip internal messages
}
const { action } = event.message;
switch (action) {
case DashboardEventAction.EditingStarted:
case DashboardEventAction.Saved: {
if (this.ignoreSave) {
this.ignoreSave = false;
return;
}
const dash = getDashboardSrv().getCurrent();
if (dash.uid !== event.message.uid) {
console.log('dashboard event for differnt dashboard?', event, dash);
return;
}
const changeTracker = getLegacyAngularInjector().get<any>('unsavedChangesSrv').tracker;
const showPopup = this.editing || changeTracker.hasChanges();
if (action === DashboardEventAction.Saved) {
if (showPopup) {
appEvents.emit(CoreEvents.showModalReact, {
component: DashboardChangedModal,
props: { event },
});
} else {
appEvents.emit(AppEvents.alertSuccess, ['Dashboard updated']);
this.reloadPage();
}
} else if (showPopup) {
if (action === DashboardEventAction.EditingStarted) {
appEvents.emit(AppEvents.alertWarning, [
'Another session is editing this dashboard',
event.message.message,
]);
}
}
return;
}
}
}
console.log('DashboardEvent EVENT', event);
},
};
reloadPage() {
const $route = getLegacyAngularInjector().get<any>('$route');
if ($route) {
$route.reload();
} else {
location.reload();
}
}
}
export const dashboardWatcher = new DashboardWatcher();
export function getDashboardChannelsFeature(): CoreGrafanaLiveFeature {
const dashboardConfig: LiveChannelConfig = {
path: '${uid}',
description: 'Dashboard change events',
variables: [{ value: 'uid', label: '${uid}', description: 'unique id for a dashboard' }],
hasPresence: true,
canPublish: () => true,
};
return {
name: 'dashboard',
support: {
getChannelConfig: (path: string) => {
return {
...dashboardConfig,
path, // set the real path
};
},
getSupportedPaths: () => [dashboardConfig],
},
description: 'Dashboard listener',
};
}
export enum DashboardEventAction {
Saved = 'saved',
EditingStarted = 'editing-started', // Sent when someone (who can save!) opens the editor
EditingCanceled = 'editing-cancelled', // Sent when someone discards changes, or unsubscribes while editing
Deleted = 'deleted',
}
export interface DashboardEvent {
uid: string;
action: DashboardEventAction;
userId?: number;
message?: string;
sessionId?: string;
}
import { LiveChannelConfig } from '@grafana/data';
import { getDashboardChannelsFeature } from './dashboard/dashboardWatcher';
import { grafanaLiveCoreFeatures } from './scopes';
export function registerLiveFeatures() {
......@@ -13,35 +14,34 @@ export function registerLiveFeatures() {
},
];
grafanaLiveCoreFeatures.register(
'testdata',
{
grafanaLiveCoreFeatures.register({
name: 'testdata',
support: {
getChannelConfig: (path: string) => {
return channels.find(c => c.path === path);
},
getSupportedPaths: () => channels,
},
'Test data generations'
);
description: 'Test data generations',
});
const chatConfig: LiveChannelConfig = {
path: 'chat',
description: 'Broadcast text messages to a channel',
const broadcastConfig: LiveChannelConfig = {
path: '${path}',
description: 'Broadcast any messages to a channel',
canPublish: () => true,
hasPresense: true,
};
grafanaLiveCoreFeatures.register(
'experimental',
{
grafanaLiveCoreFeatures.register({
name: 'broadcast',
support: {
getChannelConfig: (path: string) => {
if ('chat' === path) {
return chatConfig;
}
throw new Error('invalid path: ' + path);
return broadcastConfig;
},
getSupportedPaths: () => [chatConfig],
getSupportedPaths: () => [broadcastConfig],
},
'Experimental features'
);
description: 'Broadcast will send/recieve any events on a channel',
});
// dashboard/*
grafanaLiveCoreFeatures.register(getDashboardChannelsFeature());
}
......@@ -12,6 +12,15 @@ import {
} from './scopes';
import { registerLiveFeatures } from './features';
export const sessionId =
(window as any)?.grafanaBootData?.user?.id +
'/' +
Date.now().toString(16) +
'/' +
Math.random()
.toString(36)
.substring(2, 15);
export class CentrifugeSrv implements GrafanaLiveSrv {
readonly open = new Map<string, CentrifugeLiveChannel>();
......@@ -25,6 +34,9 @@ export class CentrifugeSrv implements GrafanaLiveSrv {
debug: true,
sockjs: SockJS,
});
this.centrifuge.setConnectData({
sessionId,
});
this.centrifuge.connect(); // do connection
this.connectionState = new BehaviorSubject<boolean>(this.centrifuge.isConnected());
this.connectionBlocker = new Promise<void>(resolve => {
......@@ -94,7 +106,7 @@ export class CentrifugeSrv implements GrafanaLiveSrv {
};
this.open.set(id, channel);
// Initalize the channel in the bacground
// Initialize the channel in the background
this.initChannel(scope, channel).catch(err => {
channel?.shutdownWithError(err);
this.open.delete(id);
......@@ -113,13 +125,13 @@ export class CentrifugeSrv implements GrafanaLiveSrv {
if (!config) {
throw new Error('unknown path: ' + channel.path);
}
if (config.canPublish?.()) {
channel.publish = (data: any) => this.centrifuge.publish(channel.id, data);
}
const events = channel.initalize(config);
if (!this.centrifuge.isConnected()) {
await this.connectionBlocker;
}
if (config.canPublish && config.canPublish()) {
channel.publish = (data: any) => this.centrifuge.publish(channel.id, data);
}
channel.subscription = this.centrifuge.subscribe(channel.id, events);
return;
}
......
......@@ -17,6 +17,12 @@ export abstract class GrafanaLiveScope {
abstract async listNamespaces(): Promise<Array<SelectableValue<string>>>;
}
export interface CoreGrafanaLiveFeature {
name: string;
support: LiveChannelSupport;
description: string;
}
class GrafanaLiveCoreScope extends GrafanaLiveScope {
readonly features = new Map<string, LiveChannelSupport>();
readonly namespaces: Array<SelectableValue<string>> = [];
......@@ -25,14 +31,13 @@ class GrafanaLiveCoreScope extends GrafanaLiveScope {
super(LiveChannelScope.Grafana);
}
register(feature: string, support: LiveChannelSupport, description: string): GrafanaLiveCoreScope {
this.features.set(feature, support);
register(feature: CoreGrafanaLiveFeature) {
this.features.set(feature.name, feature.support);
this.namespaces.push({
value: feature,
label: feature,
description,
value: feature.name,
label: feature.name,
description: feature.description,
});
return this;
}
/**
......
......@@ -12,6 +12,7 @@ import {
import { updateLocation } from 'app/core/actions';
import { ThunkResult, FolderInfo, DashboardDTO, DashboardDataDTO } from 'app/types';
import { appEvents } from '../../../core/core';
import { dashboardWatcher } from 'app/features/live/dashboard/dashboardWatcher';
export function fetchGcomDashboard(id: string): ThunkResult<void> {
return async dispatch => {
......@@ -205,6 +206,8 @@ export interface SaveDashboardOptions {
}
export function saveDashboard(options: SaveDashboardOptions) {
dashboardWatcher.ignoreNextSave();
return getBackendSrv().post('/api/dashboards/db/', {
dashboard: options.dashboard,
message: options.message ?? '',
......
......@@ -78,7 +78,7 @@ export default class Datasource extends DataSourceApi<AzureMonitorQuery, AzureDa
continue;
}
// Initalize the list of queries
// Initialize the list of queries
let q = byType[target.queryType];
if (!q) {
q = _.cloneDeep(options);
......
......@@ -76,7 +76,7 @@ export class GrafanaCtrl {
},
});
// Initalize websocket event streaming
// Initialize websocket event streaming
if (config.featureToggles.live) {
initGrafanaLive();
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment