Commit b91e9fac by Ryan McKinley Committed by GitHub

Live: update centrifuge and the ChannelHandler api (#28843)

parent d736fcdd
...@@ -19,7 +19,7 @@ require ( ...@@ -19,7 +19,7 @@ require (
github.com/beevik/etree v1.1.0 github.com/beevik/etree v1.1.0
github.com/benbjohnson/clock v0.0.0-20161215174838-7dc76406b6d3 github.com/benbjohnson/clock v0.0.0-20161215174838-7dc76406b6d3
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
github.com/centrifugal/centrifuge v0.11.0 github.com/centrifugal/centrifuge v0.13.0
github.com/crewjam/saml v0.4.1 github.com/crewjam/saml v0.4.1
github.com/davecgh/go-spew v1.1.1 github.com/davecgh/go-spew v1.1.1
github.com/deepmap/oapi-codegen v1.3.11 // indirect github.com/deepmap/oapi-codegen v1.3.11 // indirect
......
...@@ -7,15 +7,11 @@ type ChannelPublisher func(channel string, data []byte) error ...@@ -7,15 +7,11 @@ type ChannelPublisher func(channel string, data []byte) error
// ChannelHandler defines the core channel behavior // ChannelHandler defines the core channel behavior
type ChannelHandler interface { type ChannelHandler interface {
// This is called fast and often -- it must be synchrnozed // OnSubscribe is called when a client wants to subscribe to a channel
GetChannelOptions(id string) centrifuge.ChannelOptions OnSubscribe(c *centrifuge.Client, e centrifuge.SubscribeEvent) (centrifuge.SubscribeReply, error)
// Called when a client wants to subscribe to a channel // OnPublish is called when a client writes a message to the channel websocket.
OnSubscribe(c *centrifuge.Client, e centrifuge.SubscribeEvent) error OnPublish(c *centrifuge.Client, e centrifuge.PublishEvent) (centrifuge.PublishReply, error)
// AllowBroadcast is called when a client writes a message to the channel websocket.
// Returning an error will cancel the broadcast.
AllowBroadcast(c *centrifuge.Client, e centrifuge.PublishEvent) error
} }
// ChannelHandlerFactory should be implemented by all core features. // ChannelHandlerFactory should be implemented by all core features.
......
package features package features
import ( import (
"time"
"github.com/centrifugal/centrifuge" "github.com/centrifugal/centrifuge"
"github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/models"
) )
// BroadcastRunner will simply broadcast all events to `grafana/broadcast/*` channels // BroadcastRunner will simply broadcast all events to `grafana/broadcast/*` channels
// This assumes that data is a JSON object // This assumes that data is a JSON object
type BroadcastRunner struct { type BroadcastRunner struct{}
}
// GetHandlerForPath called on init // GetHandlerForPath called on init
func (b *BroadcastRunner) GetHandlerForPath(path string) (models.ChannelHandler, error) { func (b *BroadcastRunner) GetHandlerForPath(path string) (models.ChannelHandler, error) {
return b, nil // for now all channels share config return b, nil // all dashboards share the same handler
}
// GetChannelOptions called fast and often
func (b *BroadcastRunner) GetChannelOptions(id string) centrifuge.ChannelOptions {
return centrifuge.ChannelOptions{}
} }
// OnSubscribe for now allows anyone to subscribe to any dashboard // OnSubscribe will let anyone connect to the path
func (b *BroadcastRunner) OnSubscribe(c *centrifuge.Client, e centrifuge.SubscribeEvent) error { func (b *BroadcastRunner) OnSubscribe(c *centrifuge.Client, e centrifuge.SubscribeEvent) (centrifuge.SubscribeReply, error) {
// anyone can subscribe return centrifuge.SubscribeReply{
return nil Options: centrifuge.SubscribeOptions{
Presence: true,
JoinLeave: true,
Recover: true, // loads the saved value from history
},
}, nil
} }
// AllowBroadcast checks if a message can be broadcast on this channel // OnPublish is called when a client wants to broadcast on the websocket
func (b *BroadcastRunner) AllowBroadcast(c *centrifuge.Client, e centrifuge.PublishEvent) error { func (b *BroadcastRunner) OnPublish(c *centrifuge.Client, e centrifuge.PublishEvent) (centrifuge.PublishReply, error) {
return nil return centrifuge.PublishReply{
Options: centrifuge.PublishOptions{
HistorySize: 1, // The last message is saved for 10 mins
HistoryTTL: 10 * time.Minute,
},
}, nil
} }
...@@ -21,42 +21,39 @@ type DashboardHandler struct { ...@@ -21,42 +21,39 @@ type DashboardHandler struct {
} }
// GetHandlerForPath called on init // GetHandlerForPath called on init
func (g *DashboardHandler) GetHandlerForPath(path string) (models.ChannelHandler, error) { func (h *DashboardHandler) GetHandlerForPath(path string) (models.ChannelHandler, error) {
return g, nil // all dashboards share the same handler return h, nil // all dashboards share the same handler
}
// GetChannelOptions called fast and often
func (g *DashboardHandler) GetChannelOptions(id string) centrifuge.ChannelOptions {
return centrifuge.ChannelOptions{
Presence: true,
JoinLeave: true, // if enterprise?
}
} }
// OnSubscribe for now allows anyone to subscribe to any dashboard // OnSubscribe for now allows anyone to subscribe to any dashboard
func (g *DashboardHandler) OnSubscribe(c *centrifuge.Client, e centrifuge.SubscribeEvent) error { func (h *DashboardHandler) OnSubscribe(c *centrifuge.Client, e centrifuge.SubscribeEvent) (centrifuge.SubscribeReply, error) {
// TODO? check authentication return centrifuge.SubscribeReply{
return nil Options: centrifuge.SubscribeOptions{
Presence: true,
JoinLeave: true,
},
}, nil
} }
// AllowBroadcast checks if a message from the websocket can be broadcast on this channel // OnPublish is called when someone begins to edit a dashoard
// currently messages are sent when a dashboard starts editing func (h *DashboardHandler) OnPublish(c *centrifuge.Client, e centrifuge.PublishEvent) (centrifuge.PublishReply, error) {
func (g *DashboardHandler) AllowBroadcast(c *centrifuge.Client, e centrifuge.PublishEvent) error { return centrifuge.PublishReply{
return nil Options: centrifuge.PublishOptions{},
}, nil
} }
// DashboardSaved should broadcast to the appropriate stream // DashboardSaved should broadcast to the appropriate stream
func (g *DashboardHandler) publish(event dashboardEvent) error { func (h *DashboardHandler) publish(event dashboardEvent) error {
msg, err := json.Marshal(event) msg, err := json.Marshal(event)
if err != nil { if err != nil {
return err return err
} }
return g.Publisher("grafana/dashboard/"+event.UID, msg) return h.Publisher("grafana/dashboard/"+event.UID, msg)
} }
// DashboardSaved will broadcast to all connected dashboards // DashboardSaved will broadcast to all connected dashboards
func (g *DashboardHandler) DashboardSaved(uid string, userID int64) error { func (h *DashboardHandler) DashboardSaved(uid string, userID int64) error {
return g.publish(dashboardEvent{ return h.publish(dashboardEvent{
UID: uid, UID: uid,
Action: "saved", Action: "saved",
UserID: userID, UserID: userID,
...@@ -64,8 +61,8 @@ func (g *DashboardHandler) DashboardSaved(uid string, userID int64) error { ...@@ -64,8 +61,8 @@ func (g *DashboardHandler) DashboardSaved(uid string, userID int64) error {
} }
// DashboardDeleted will broadcast to all connected dashboards // DashboardDeleted will broadcast to all connected dashboards
func (g *DashboardHandler) DashboardDeleted(uid string, userID int64) error { func (h *DashboardHandler) DashboardDeleted(uid string, userID int64) error {
return g.publish(dashboardEvent{ return h.publish(dashboardEvent{
UID: uid, UID: uid,
Action: "deleted", Action: "deleted",
UserID: userID, UserID: userID,
......
...@@ -21,20 +21,15 @@ func (m *MeasurementsRunner) GetHandlerForPath(path string) (models.ChannelHandl ...@@ -21,20 +21,15 @@ func (m *MeasurementsRunner) GetHandlerForPath(path string) (models.ChannelHandl
return m, nil // for now all channels share config return m, nil // for now all channels share config
} }
// GetChannelOptions gets channel options. // OnSubscribe will let anyone connect to the path
// It gets called fast and often. func (m *MeasurementsRunner) OnSubscribe(c *centrifuge.Client, e centrifuge.SubscribeEvent) (centrifuge.SubscribeReply, error) {
func (m *MeasurementsRunner) GetChannelOptions(id string) centrifuge.ChannelOptions { return centrifuge.SubscribeReply{}, nil
return centrifuge.ChannelOptions{}
} }
// OnSubscribe for now allows anyone to subscribe to any dashboard. // OnPublish is called when a client wants to broadcast on the websocket
func (m *MeasurementsRunner) OnSubscribe(c *centrifuge.Client, e centrifuge.SubscribeEvent) error {
// anyone can subscribe
return nil
}
// AllowBroadcast checks if a message from the websocket can be broadcast on this channel
// Currently this sends measurements over websocket -- should be replaced with the HTTP interface // Currently this sends measurements over websocket -- should be replaced with the HTTP interface
func (m *MeasurementsRunner) AllowBroadcast(c *centrifuge.Client, e centrifuge.PublishEvent) error { func (m *MeasurementsRunner) OnPublish(c *centrifuge.Client, e centrifuge.PublishEvent) (centrifuge.PublishReply, error) {
return nil return centrifuge.PublishReply{
Options: centrifuge.PublishOptions{},
}, nil
} }
...@@ -27,12 +27,12 @@ type TestDataSupplier struct { ...@@ -27,12 +27,12 @@ type TestDataSupplier struct {
// GetHandlerForPath gets the channel handler for a path. // GetHandlerForPath gets the channel handler for a path.
// Called on init. // Called on init.
func (g *TestDataSupplier) GetHandlerForPath(path string) (models.ChannelHandler, error) { func (s *TestDataSupplier) GetHandlerForPath(path string) (models.ChannelHandler, error) {
channel := "grafana/testdata/" + path channel := "grafana/testdata/" + path
if path == "random-2s-stream" { if path == "random-2s-stream" {
return &testDataRunner{ return &testDataRunner{
publisher: g.Publisher, publisher: s.Publisher,
running: false, running: false,
speedMillis: 2000, speedMillis: 2000,
dropPercent: 0, dropPercent: 0,
...@@ -43,7 +43,7 @@ func (g *TestDataSupplier) GetHandlerForPath(path string) (models.ChannelHandler ...@@ -43,7 +43,7 @@ func (g *TestDataSupplier) GetHandlerForPath(path string) (models.ChannelHandler
if path == "random-flakey-stream" { if path == "random-flakey-stream" {
return &testDataRunner{ return &testDataRunner{
publisher: g.Publisher, publisher: s.Publisher,
running: false, running: false,
speedMillis: 400, speedMillis: 400,
dropPercent: .6, dropPercent: .6,
...@@ -54,39 +54,32 @@ func (g *TestDataSupplier) GetHandlerForPath(path string) (models.ChannelHandler ...@@ -54,39 +54,32 @@ func (g *TestDataSupplier) GetHandlerForPath(path string) (models.ChannelHandler
return nil, fmt.Errorf("unknown channel") return nil, fmt.Errorf("unknown channel")
} }
// GetChannelOptions gets channel options. // OnSubscribe will let anyone connect to the path
// Called fast and often. func (r *testDataRunner) OnSubscribe(c *centrifuge.Client, e centrifuge.SubscribeEvent) (centrifuge.SubscribeReply, error) {
func (g *testDataRunner) GetChannelOptions(id string) centrifuge.ChannelOptions { if !r.running {
return centrifuge.ChannelOptions{} r.running = true
}
// OnSubscribe for now allows anyone to subscribe to any dashboard.
func (g *testDataRunner) OnSubscribe(c *centrifuge.Client, e centrifuge.SubscribeEvent) error {
if !g.running {
g.running = true
// Run in the background // Run in the background
go g.runRandomCSV() go r.runRandomCSV()
} }
// TODO? check authentication return centrifuge.SubscribeReply{}, nil
return nil
} }
// AllowBroadcast checks if a message from the websocket can be broadcast on this channel // OnPublish checks if a message from the websocket can be broadcast on this channel
func (g *testDataRunner) AllowBroadcast(c *centrifuge.Client, e centrifuge.PublishEvent) error { func (r *testDataRunner) OnPublish(c *centrifuge.Client, e centrifuge.PublishEvent) (centrifuge.PublishReply, error) {
return fmt.Errorf("can not publish to testdata") return centrifuge.PublishReply{}, fmt.Errorf("can not publish to testdata")
} }
// runRandomCSV is just for an example. // runRandomCSV is just for an example.
func (g *testDataRunner) runRandomCSV() { func (r *testDataRunner) runRandomCSV() {
spread := 50.0 spread := 50.0
walker := rand.Float64() * 100 walker := rand.Float64() * 100
ticker := time.NewTicker(time.Duration(g.speedMillis) * time.Millisecond) ticker := time.NewTicker(time.Duration(r.speedMillis) * time.Millisecond)
measurement := models.Measurement{ measurement := models.Measurement{
Name: g.name, Name: r.name,
Time: 0, Time: 0,
Values: make(map[string]interface{}, 5), Values: make(map[string]interface{}, 5),
} }
...@@ -95,7 +88,7 @@ func (g *testDataRunner) runRandomCSV() { ...@@ -95,7 +88,7 @@ func (g *testDataRunner) runRandomCSV() {
} }
for t := range ticker.C { for t := range ticker.C {
if rand.Float64() <= g.dropPercent { if rand.Float64() <= r.dropPercent {
continue continue
} }
delta := rand.Float64() - 0.5 delta := rand.Float64() - 0.5
...@@ -112,9 +105,9 @@ func (g *testDataRunner) runRandomCSV() { ...@@ -112,9 +105,9 @@ func (g *testDataRunner) runRandomCSV() {
continue continue
} }
err = g.publisher(g.channel, bytes) err = r.publisher(r.channel, bytes)
if err != nil { if err != nil {
logger.Warn("write", "channel", g.channel, "measurement", measurement) logger.Warn("write", "channel", r.channel, "measurement", measurement)
} }
} }
} }
...@@ -74,20 +74,6 @@ func (g *GrafanaLive) Init() error { ...@@ -74,20 +74,6 @@ func (g *GrafanaLive) Init() error {
// cfg.LogLevel = centrifuge.LogLevelDebug // cfg.LogLevel = centrifuge.LogLevelDebug
cfg.LogHandler = handleLog cfg.LogHandler = handleLog
// This function is called fast and often -- it must be sychronized
cfg.ChannelOptionsFunc = func(channel string) (centrifuge.ChannelOptions, bool, error) {
handler, err := g.GetChannelHandler(channel)
if err != nil {
logger.Error("ChannelOptionsFunc", "channel", channel, "err", err)
if err.Error() == "404" { // ????
return centrifuge.ChannelOptions{}, false, nil
}
return centrifuge.ChannelOptions{}, true, err
}
opts := handler.GetChannelOptions(channel)
return opts, true, nil
}
// Node is the core object in Centrifuge library responsible for many useful // Node is the core object in Centrifuge library responsible for many useful
// things. For example Node allows to publish messages to channels from server // things. For example Node allows to publish messages to channels from server
// side with its Publish method, but in this example we will publish messages // side with its Publish method, but in this example we will publish messages
...@@ -115,57 +101,29 @@ func (g *GrafanaLive) Init() error { ...@@ -115,57 +101,29 @@ func (g *GrafanaLive) Init() error {
// inside handler must be synchronized since it will be called concurrently from // inside handler must be synchronized since it will be called concurrently from
// different goroutines (belonging to different client connections). This is also // different goroutines (belonging to different client connections). This is also
// true for other event handlers. // true for other event handlers.
node.OnConnect(func(c *centrifuge.Client) { node.OnConnect(func(client *centrifuge.Client) {
// In our example transport will always be Websocket but it can also be SockJS. logger.Debug("Client connected", "user", client.UserID())
transportName := c.Transport().Name()
client.OnSubscribe(func(e centrifuge.SubscribeEvent, cb centrifuge.SubscribeCallback) {
// In our example clients connect with JSON protocol but it can also be Protobuf. handler, err := g.GetChannelHandler(e.Channel)
transportEncoding := c.Transport().Encoding() if err != nil {
logger.Debug("client connected", "transport", transportName, "encoding", transportEncoding) cb(centrifuge.SubscribeReply{}, err)
}) } else {
cb(handler.OnSubscribe(client, e))
// Set Disconnect handler to react on client disconnect events. }
node.OnDisconnect(func(c *centrifuge.Client, e centrifuge.DisconnectEvent) { })
logger.Info("client disconnected")
}) // Called when a client writes to the websocket channel.
// In general, we should prefer writing to the HTTP API, but this
// Set SubscribeHandler to react on every channel subscription attempt // allows some simple prototypes to work quickly
// initiated by client. Here you can theoretically return an error or client.OnPublish(func(e centrifuge.PublishEvent, cb centrifuge.PublishCallback) {
// disconnect client from server if needed. But now we just accept handler, err := g.GetChannelHandler(e.Channel)
// all subscriptions to all channels. In real life you may use a more if err != nil {
// complex permission check here. cb(centrifuge.PublishReply{}, err)
node.OnSubscribe(func(c *centrifuge.Client, e centrifuge.SubscribeEvent) (centrifuge.SubscribeReply, error) { } else {
reply := centrifuge.SubscribeReply{} cb(handler.OnPublish(client, e))
}
handler, err := g.GetChannelHandler(e.Channel) })
if err != nil {
return reply, err
}
err = handler.OnSubscribe(c, e)
if err != nil {
return reply, err
}
return reply, nil
})
node.OnUnsubscribe(func(c *centrifuge.Client, e centrifuge.UnsubscribeEvent) {
logger.Debug("unsubscribe from channel", "channel", e.Channel, "user", c.UserID())
})
// Called when a client writes to the websocket channel.
// In general, we should prefer writing to the HTTP API, but this
// allows some simple prototypes to work quickly
node.OnPublish(func(c *centrifuge.Client, e centrifuge.PublishEvent) (centrifuge.PublishReply, error) {
reply := centrifuge.PublishReply{}
handler, err := g.GetChannelHandler(e.Channel)
if err != nil {
return reply, err
}
err = handler.AllowBroadcast(c, e)
return centrifuge.PublishReply{}, err
}) })
// Run node. This method does not block. // Run node. This method does not block.
......
...@@ -12,21 +12,16 @@ type PluginHandler struct { ...@@ -12,21 +12,16 @@ type PluginHandler struct {
} }
// GetHandlerForPath called on init // GetHandlerForPath called on init
func (g *PluginHandler) GetHandlerForPath(path string) (models.ChannelHandler, error) { func (h *PluginHandler) GetHandlerForPath(path string) (models.ChannelHandler, error) {
return g, nil // all dashboards share the same handler return h, nil // all dashboards share the same handler
} }
// GetChannelOptions called fast and often // OnSubscribe for now allows anyone to subscribe
func (g *PluginHandler) GetChannelOptions(id string) centrifuge.ChannelOptions { func (h *PluginHandler) OnSubscribe(c *centrifuge.Client, e centrifuge.SubscribeEvent) (centrifuge.SubscribeReply, error) {
return centrifuge.ChannelOptions{} return centrifuge.SubscribeReply{}, nil
} }
// OnSubscribe for now allows anyone to subscribe to any dashboard // OnPublish checks if a message from the websocket can be broadcast on this channel
func (g *PluginHandler) OnSubscribe(c *centrifuge.Client, e centrifuge.SubscribeEvent) error { func (h *PluginHandler) OnPublish(c *centrifuge.Client, e centrifuge.PublishEvent) (centrifuge.PublishReply, error) {
return nil // anyone can subscribe return centrifuge.PublishReply{}, nil // broadcast any event
}
// AllowBroadcast checks if a message from the websocket can be broadcast on this channel
func (g *PluginHandler) AllowBroadcast(c *centrifuge.Client, e centrifuge.PublishEvent) error {
return nil // broadcast any event
} }
...@@ -55,19 +55,13 @@ func (s *LogQueryRunnerSupplier) GetHandlerForPath(path string) (models.ChannelH ...@@ -55,19 +55,13 @@ func (s *LogQueryRunnerSupplier) GetHandlerForPath(path string) (models.ChannelH
}, nil }, nil
} }
// GetChannelOptions gets channel options.
// It's called fast and often.
func (r *logQueryRunner) GetChannelOptions(id string) centrifuge.ChannelOptions {
return centrifuge.ChannelOptions{}
}
// OnSubscribe publishes results from the corresponding CloudWatch Logs query to the provided channel // OnSubscribe publishes results from the corresponding CloudWatch Logs query to the provided channel
func (r *logQueryRunner) OnSubscribe(c *centrifuge.Client, e centrifuge.SubscribeEvent) error { func (r *logQueryRunner) OnSubscribe(c *centrifuge.Client, e centrifuge.SubscribeEvent) (centrifuge.SubscribeReply, error) {
r.runningMu.Lock() r.runningMu.Lock()
defer r.runningMu.Unlock() defer r.runningMu.Unlock()
if _, ok := r.running[e.Channel]; ok { if _, ok := r.running[e.Channel]; ok {
return nil return centrifuge.SubscribeReply{}, nil
} }
r.running[e.Channel] = true r.running[e.Channel] = true
...@@ -77,12 +71,12 @@ func (r *logQueryRunner) OnSubscribe(c *centrifuge.Client, e centrifuge.Subscrib ...@@ -77,12 +71,12 @@ func (r *logQueryRunner) OnSubscribe(c *centrifuge.Client, e centrifuge.Subscrib
} }
}() }()
return nil return centrifuge.SubscribeReply{}, nil
} }
// AllowBroadcast checks if a message from the websocket can be broadcast on this channel // OnPublish checks if a message from the websocket can be broadcast on this channel
func (r *logQueryRunner) AllowBroadcast(c *centrifuge.Client, e centrifuge.PublishEvent) error { func (r *logQueryRunner) OnPublish(c *centrifuge.Client, e centrifuge.PublishEvent) (centrifuge.PublishReply, error) {
return fmt.Errorf("can not publish") return centrifuge.PublishReply{}, fmt.Errorf("can not publish")
} }
func (r *logQueryRunner) publishResults(channelName string) error { func (r *logQueryRunner) publishResults(channelName string) error {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment