Commit 37523791 by Torkel Ödegaard

Worked on event system, needs a little more work

parent a712f1a2
......@@ -121,7 +121,7 @@ daily_rotate = true
; Expired days of log file(delete after max days), default is 7
max_days = 7
[notifications]
[event_publisher]
enabled = false
rabbitmq_url = amqp://localhost/
notifications_exchange = notifications
exchange = grafana_events
Subproject commit 0fe83d51981333600f1e3801044fc1cfd5acf1ae
Subproject commit 07ec00641fb6d633dc2914ff433e61db1ef8a313
......@@ -11,13 +11,16 @@ type Msg interface{}
type Bus interface {
Dispatch(msg Msg) error
Publish(msg Msg) error
AddHandler(handler HandlerFunc)
AddEventListener(handler HandlerFunc)
AddWildcardListener(handler HandlerFunc)
}
type InProcBus struct {
handlers map[string]HandlerFunc
listeners map[string][]HandlerFunc
handlers map[string]HandlerFunc
listeners map[string][]HandlerFunc
wildcardListeners []HandlerFunc
}
// temp stuff, not sure how to handle bus instance, and init yet
......@@ -27,6 +30,7 @@ func New() Bus {
bus := &InProcBus{}
bus.handlers = make(map[string]HandlerFunc)
bus.listeners = make(map[string][]HandlerFunc)
bus.wildcardListeners = make([]HandlerFunc, 0)
return bus
}
......@@ -53,9 +57,6 @@ func (b *InProcBus) Dispatch(msg Msg) error {
func (b *InProcBus) Publish(msg Msg) error {
var msgName = reflect.TypeOf(msg).Elem().Name()
var listeners = b.listeners[msgName]
if len(listeners) == 0 {
return nil
}
var params = make([]reflect.Value, 1)
params[0] = reflect.ValueOf(msg)
......@@ -68,9 +69,21 @@ func (b *InProcBus) Publish(msg Msg) error {
}
}
for _, listenerHandler := range b.wildcardListeners {
ret := reflect.ValueOf(listenerHandler).Call(params)
err := ret[0].Interface()
if err != nil {
return err.(error)
}
}
return nil
}
func (b *InProcBus) AddWildcardListener(handler HandlerFunc) {
b.wildcardListeners = append(b.wildcardListeners, handler)
}
func (b *InProcBus) AddHandler(handler HandlerFunc) {
handlerType := reflect.TypeOf(handler)
queryTypeName := handlerType.In(0).Elem().Name()
......@@ -97,10 +110,14 @@ func AddEventListener(handler HandlerFunc) {
globalBus.AddEventListener(handler)
}
func AddWildcardListener(handler HandlerFunc) {
globalBus.AddWildcardListener(handler)
}
func Dispatch(msg Msg) error {
return globalBus.Dispatch(msg)
}
func Publish(msg Msg) error {
return globalBus.Publish(msg)
}
\ No newline at end of file
}
......@@ -2,6 +2,7 @@ package bus
import (
"errors"
"fmt"
"testing"
)
......@@ -62,7 +63,7 @@ func TestEventListeners(t *testing.T) {
if err != nil {
t.Fatal("Publish event failed " + err.Error())
} else if count != 0 {
t.Fatal("Publish event failed, listeners called: %v, expected: %v", count, 11)
} else if count != 11 {
t.Fatal(fmt.Sprintf("Publish event failed, listeners called: %v, expected: %v", count, 11))
}
}
......@@ -16,10 +16,10 @@ import (
"github.com/torkelo/grafana-pro/pkg/api"
"github.com/torkelo/grafana-pro/pkg/log"
"github.com/torkelo/grafana-pro/pkg/middleware"
"github.com/torkelo/grafana-pro/pkg/services/eventpublisher"
"github.com/torkelo/grafana-pro/pkg/services/sqlstore"
"github.com/torkelo/grafana-pro/pkg/setting"
"github.com/torkelo/grafana-pro/pkg/social"
"github.com/torkelo/grafana-pro/pkg/services/notification"
)
var CmdWeb = cli.Command{
......@@ -82,13 +82,9 @@ func runWeb(c *cli.Context) {
social.NewOAuthService()
sqlstore.NewEngine()
sqlstore.EnsureAdminUser()
eventpublisher.Init()
var err error
if setting.NotificationsEnabled {
err = notification.Init(setting.RabbitmqUrl, setting.NotificationsExchange)
if err != nil {
log.Fatal(4, "Failed to connect to notification queue: %v", err)
}
}
m := newMacaron()
api.Register(m)
......
package events
// Events can be passed to external systems via for example AMPQ
// Treat these events as basically DTOs so changes has to be backward compatible
type AccountCreated struct {
Name string `json:"name"`
}
package events
import (
"testing"
. "github.com/smartystreets/goconvey/convey"
)
func TestEventCreation(t *testing.T) {
Convey("When generating slug", t, func() {
dashboard := NewDashboard("Grafana Play Home")
dashboard.UpdateSlug()
So(dashboard.Slug, ShouldEqual, "grafana-play-home")
})
}
package notification
package eventpublisher
import (
"encoding/json"
"fmt"
"log"
"reflect"
"time"
"encoding/json"
"github.com/streadway/amqp"
"github.com/torkelo/grafana-pro/pkg/bus"
m "github.com/torkelo/grafana-pro/pkg/models"
"github.com/torkelo/grafana-pro/pkg/setting"
)
var (
url string
url string
exchange string
conn *amqp.Connection
channel *amqp.Channel
conn *amqp.Connection
channel *amqp.Channel
)
func getConnection() (*amqp.Connection, error) {
c, err := amqp.Dial(url)
if err != nil {
return nil, err
return nil, err
}
return c, err
}
......@@ -31,25 +34,35 @@ func getChannel() (*amqp.Channel, error) {
}
err = ch.ExchangeDeclare(
exchange, // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
exchange, // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if (err != nil) {
if err != nil {
return nil, err
}
return ch, err
}
func Init(rabbitUrl string, exchangeName string) error {
url = rabbitUrl
exchange = exchangeName
bus.AddEventListener(NotificationHandler)
return Setup()
func Init() {
sec := setting.Cfg.Section("event_publisher")
if !sec.Key("enabled").MustBool(false) {
return
}
url = sec.Key("rabbitmq_url").String()
exchange = sec.Key("exchange").String()
bus.AddWildcardListener(eventListener)
if err := Setup(); err != nil {
log.Fatal(4, "Failed to connect to notification queue: %v", err)
return
}
}
// Every connection should declare the topology they expect
......@@ -82,7 +95,7 @@ func Setup() error {
//could not create channel, so lets close the connection
// and re-create.
_ = conn.Close()
for err != nil {
time.Sleep(2 * time.Second)
fmt.Println("attempting to reconnect to rabbitmq.")
......@@ -92,39 +105,42 @@ func Setup() error {
}
}()
return nil
return nil
}
func Publish(routingKey string, msgString []byte) {
func publish(routingKey string, msgString []byte) {
err := channel.Publish(
exchange, //exchange
routingKey, // routing key
false, // mandatory
exchange, //exchange
routingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: msgString,
Body: msgString,
},
)
if err != nil {
// failures are most likely because the connection was lost.
// the connection will be re-established, so just keep
// the connection will be re-established, so just keep
// retrying every 2seconds until we successfully publish.
time.Sleep(2 * time.Second)
fmt.Println("publish failed, retrying.");
Publish(routingKey, msgString)
fmt.Println("publish failed, retrying.")
publish(routingKey, msgString)
}
return
}
func NotificationHandler(event *m.Notification) error {
func eventListener(event interface{}) error {
msgString, err := json.Marshal(event)
if err != nil {
return err
}
routingKey := fmt.Sprintf("%s.%s", event.Priority, event.EventType)
eventType := reflect.TypeOf(event)
routingKey := fmt.Sprintf("%s.%s", "INFO", eventType.Name())
// this is run in a greenthread and we expect that publish will keep
// retrying until the message gets sent.
go Publish(routingKey, msgString)
go publish(routingKey, msgString)
return nil
}
\ No newline at end of file
}
......@@ -6,6 +6,7 @@ import (
"github.com/go-xorm/xorm"
"github.com/torkelo/grafana-pro/pkg/bus"
"github.com/torkelo/grafana-pro/pkg/events"
m "github.com/torkelo/grafana-pro/pkg/models"
)
......@@ -48,7 +49,7 @@ func GetAccountByName(query *m.GetAccountByNameQuery) error {
}
func CreateAccount(cmd *m.CreateAccountCommand) error {
return inTransaction(func(sess *xorm.Session) error {
return inTransaction2(func(sess *session) error {
account := m.Account{
Name: cmd.Name,
......@@ -60,7 +61,6 @@ func CreateAccount(cmd *m.CreateAccountCommand) error {
return err
}
// create inital admin account user
user := m.AccountUser{
AccountId: account.Id,
UserId: cmd.UserId,
......@@ -72,6 +72,8 @@ func CreateAccount(cmd *m.CreateAccountCommand) error {
_, err := sess.Insert(&user)
cmd.Result = account
sess.publishAfterCommit(&events.AccountCreated{})
// silently ignore failures to publish events.
_ = bus.Publish(&m.Notification{
EventType: "account.create",
......@@ -79,7 +81,7 @@ func CreateAccount(cmd *m.CreateAccountCommand) error {
Priority: m.PRIO_INFO,
Payload: account,
})
return err
})
}
......
package sqlstore
import (
"github.com/go-xorm/xorm"
"github.com/torkelo/grafana-pro/pkg/bus"
"github.com/torkelo/grafana-pro/pkg/log"
)
type dbTransactionFunc func(sess *xorm.Session) error
type dbTransactionFunc2 func(sess *session) error
type session struct {
*xorm.Session
events []interface{}
}
func (sess *session) publishAfterCommit(msg interface{}) {
sess.events = append(sess.events, msg)
}
func inTransaction(callback dbTransactionFunc) error {
var err error
sess := x.NewSession()
defer sess.Close()
if err = sess.Begin(); err != nil {
return err
}
err = callback(sess)
if err != nil {
sess.Rollback()
return err
} else if err = sess.Commit(); err != nil {
return err
}
return nil
}
func inTransaction2(callback dbTransactionFunc2) error {
var err error
sess := session{Session: x.NewSession()}
defer sess.Close()
if err = sess.Begin(); err != nil {
return err
}
err = callback(&sess)
if err != nil {
sess.Rollback()
return err
} else if err = sess.Commit(); err != nil {
return err
}
if len(sess.events) > 0 {
for _, e := range sess.events {
if err = bus.Publish(e); err != nil {
log.Error(3, "Failed to publish event after commit", err)
}
}
}
return nil
}
......@@ -149,27 +149,3 @@ func LoadConfig() {
DbCfg.SslMode = sec.Key("ssl_mode").String()
DbCfg.Path = sec.Key("path").MustString("data/grafana.db")
}
type dbTransactionFunc func(sess *xorm.Session) error
func inTransaction(callback dbTransactionFunc) error {
var err error
sess := x.NewSession()
defer sess.Close()
if err = sess.Begin(); err != nil {
return err
}
err = callback(sess)
if err != nil {
sess.Rollback()
return err
} else if err = sess.Commit(); err != nil {
return err
}
return nil
}
......@@ -93,12 +93,6 @@ var (
// PhantomJs Rendering
ImagesDir string
PhantomDir string
//Notifications
NotificationsEnabled bool
RabbitmqUrl string
NotificationsExchange string
)
func init() {
......@@ -228,7 +222,7 @@ func NewConfigContext() {
// Notifications
NotificationsEnabled = Cfg.Section("notifications").Key("enabled").MustBool(false)
RabbitmqUrl = Cfg.Section("notifications").Key("rabbitmq_url").MustString("amqp://localhost/")
// validate rabbitmqUrl.
_, err = url.Parse(RabbitmqUrl)
if err != nil {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment