Commit b7a8416e by Ryan McKinley Committed by GitHub

Live: improve broadcast semantics and avoid double posting (#28765)

parent 5a11abe9
...@@ -13,8 +13,9 @@ type ChannelHandler interface { ...@@ -13,8 +13,9 @@ type ChannelHandler interface {
// Called when a client wants to subscribe to a channel // Called when a client wants to subscribe to a channel
OnSubscribe(c *centrifuge.Client, e centrifuge.SubscribeEvent) error OnSubscribe(c *centrifuge.Client, e centrifuge.SubscribeEvent) error
// Called when something writes into the channel. The returned value will be broadcast if len() > 0 // AllowBroadcast is called when a client writes a message to the channel websocket.
OnPublish(c *centrifuge.Client, e centrifuge.PublishEvent) ([]byte, error) // Returning an error will cancel the broadcast.
AllowBroadcast(c *centrifuge.Client, e centrifuge.PublishEvent) error
} }
// ChannelHandlerFactory should be implemented by all core features. // ChannelHandlerFactory should be implemented by all core features.
......
...@@ -26,8 +26,7 @@ func (b *BroadcastRunner) OnSubscribe(c *centrifuge.Client, e centrifuge.Subscri ...@@ -26,8 +26,7 @@ func (b *BroadcastRunner) OnSubscribe(c *centrifuge.Client, e centrifuge.Subscri
return nil return nil
} }
// OnPublish called when an event is received from the websocket // AllowBroadcast checks if a message can be broadcast on this channel
func (b *BroadcastRunner) OnPublish(c *centrifuge.Client, e centrifuge.PublishEvent) ([]byte, error) { func (b *BroadcastRunner) AllowBroadcast(c *centrifuge.Client, e centrifuge.PublishEvent) error {
// expect the data to be the right shape? return nil
return e.Data, nil
} }
...@@ -39,10 +39,10 @@ func (g *DashboardHandler) OnSubscribe(c *centrifuge.Client, e centrifuge.Subscr ...@@ -39,10 +39,10 @@ func (g *DashboardHandler) OnSubscribe(c *centrifuge.Client, e centrifuge.Subscr
return nil return nil
} }
// OnPublish called when an event is received from the websocket // AllowBroadcast checks if a message from the websocket can be broadcast on this channel
func (g *DashboardHandler) OnPublish(c *centrifuge.Client, e centrifuge.PublishEvent) ([]byte, error) { // currently messages are sent when a dashboard starts editing
// TODO -- verify and keep track of editors? func (g *DashboardHandler) AllowBroadcast(c *centrifuge.Client, e centrifuge.PublishEvent) error {
return e.Data, nil return nil
} }
// DashboardSaved should broadcast to the appropriate stream // DashboardSaved should broadcast to the appropriate stream
......
...@@ -33,9 +33,8 @@ func (m *MeasurementsRunner) OnSubscribe(c *centrifuge.Client, e centrifuge.Subs ...@@ -33,9 +33,8 @@ func (m *MeasurementsRunner) OnSubscribe(c *centrifuge.Client, e centrifuge.Subs
return nil return nil
} }
// OnPublish is called when an event is received from the websocket. // AllowBroadcast checks if a message from the websocket can be broadcast on this channel
func (m *MeasurementsRunner) OnPublish(c *centrifuge.Client, e centrifuge.PublishEvent) ([]byte, error) { // Currently this sends measurements over websocket -- should be replaced with the HTTP interface
// currently generic... but should be stricter func (m *MeasurementsRunner) AllowBroadcast(c *centrifuge.Client, e centrifuge.PublishEvent) error {
// logger.Debug("Measurements runner got event on channel", "channel", e.Channel) return nil
return e.Data, nil
} }
...@@ -73,9 +73,9 @@ func (g *testDataRunner) OnSubscribe(c *centrifuge.Client, e centrifuge.Subscrib ...@@ -73,9 +73,9 @@ func (g *testDataRunner) OnSubscribe(c *centrifuge.Client, e centrifuge.Subscrib
return nil return nil
} }
// OnPublish is called when an event is received from the websocket. // AllowBroadcast checks if a message from the websocket can be broadcast on this channel
func (g *testDataRunner) OnPublish(c *centrifuge.Client, e centrifuge.PublishEvent) ([]byte, error) { func (g *testDataRunner) AllowBroadcast(c *centrifuge.Client, e centrifuge.PublishEvent) error {
return nil, fmt.Errorf("can not publish to testdata") return fmt.Errorf("can not publish to testdata")
} }
// runRandomCSV is just for an example. // runRandomCSV is just for an example.
......
package live package live
import ( import (
"encoding/json"
"fmt" "fmt"
"strings" "strings"
"sync" "sync"
...@@ -155,7 +154,9 @@ func (g *GrafanaLive) Init() error { ...@@ -155,7 +154,9 @@ func (g *GrafanaLive) Init() error {
logger.Debug("unsubscribe from channel", "channel", e.Channel, "user", c.UserID()) logger.Debug("unsubscribe from channel", "channel", e.Channel, "user", c.UserID())
}) })
// Called when something is written to the websocket // Called when a client writes to the websocket channel.
// In general, we should prefer writing to the HTTP API, but this
// allows some simple prototypes to work quickly
node.OnPublish(func(c *centrifuge.Client, e centrifuge.PublishEvent) (centrifuge.PublishReply, error) { node.OnPublish(func(c *centrifuge.Client, e centrifuge.PublishEvent) (centrifuge.PublishReply, error) {
reply := centrifuge.PublishReply{} reply := centrifuge.PublishReply{}
handler, err := g.GetChannelHandler(e.Channel) handler, err := g.GetChannelHandler(e.Channel)
...@@ -163,14 +164,8 @@ func (g *GrafanaLive) Init() error { ...@@ -163,14 +164,8 @@ func (g *GrafanaLive) Init() error {
return reply, err return reply, err
} }
data, err := handler.OnPublish(c, e) err = handler.AllowBroadcast(c, e)
if err != nil { return centrifuge.PublishReply{}, err
return reply, err
}
if len(data) > 0 {
_, err = node.Publish(e.Channel, e.Data)
}
return centrifuge.PublishReply{}, err // returns an error if it could not publish
}) })
// Run node. This method does not block. // Run node. This method does not block.
...@@ -199,26 +194,8 @@ func (g *GrafanaLive) Init() error { ...@@ -199,26 +194,8 @@ func (g *GrafanaLive) Init() error {
return return
} }
dto := models.UserProfileDTO{
Id: user.UserId,
Name: user.Name,
Email: user.Email,
Login: user.Login,
IsGrafanaAdmin: user.IsGrafanaAdmin,
OrgId: user.OrgId,
}
jsonData, err := json.Marshal(dto)
if err != nil {
logger.Debug("error reading user", "dto", dto)
ctx.Resp.WriteHeader(404)
return
}
logger.Info("Logged in user", "user", user)
cred := &centrifuge.Credentials{ cred := &centrifuge.Credentials{
UserID: fmt.Sprintf("%d", user.UserId), UserID: fmt.Sprintf("%d", user.UserId),
Info: jsonData,
} }
newCtx := centrifuge.SetCredentials(ctx.Req.Context(), cred) newCtx := centrifuge.SetCredentials(ctx.Req.Context(), cred)
......
...@@ -26,7 +26,7 @@ func (g *PluginHandler) OnSubscribe(c *centrifuge.Client, e centrifuge.Subscribe ...@@ -26,7 +26,7 @@ func (g *PluginHandler) OnSubscribe(c *centrifuge.Client, e centrifuge.Subscribe
return nil // anyone can subscribe return nil // anyone can subscribe
} }
// OnPublish called when an event is received from the websocket // AllowBroadcast checks if a message from the websocket can be broadcast on this channel
func (g *PluginHandler) OnPublish(c *centrifuge.Client, e centrifuge.PublishEvent) ([]byte, error) { func (g *PluginHandler) AllowBroadcast(c *centrifuge.Client, e centrifuge.PublishEvent) error {
return e.Data, nil // broadcast any event return nil // broadcast any event
} }
...@@ -80,9 +80,9 @@ func (r *logQueryRunner) OnSubscribe(c *centrifuge.Client, e centrifuge.Subscrib ...@@ -80,9 +80,9 @@ func (r *logQueryRunner) OnSubscribe(c *centrifuge.Client, e centrifuge.Subscrib
return nil return nil
} }
// OnPublish is called when an event is received from the websocket. // AllowBroadcast checks if a message from the websocket can be broadcast on this channel
func (r *logQueryRunner) OnPublish(c *centrifuge.Client, e centrifuge.PublishEvent) ([]byte, error) { func (r *logQueryRunner) AllowBroadcast(c *centrifuge.Client, e centrifuge.PublishEvent) error {
return nil, fmt.Errorf("can not publish") return fmt.Errorf("can not publish")
} }
func (r *logQueryRunner) publishResults(channelName string) error { func (r *logQueryRunner) publishResults(channelName string) error {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment