Commit c902ec18 by bergquist

Merge branch 'notification-service-refactor2'

* notification-service-refactor2:
  fix: removed log calls used while troubleshooting
  refactor: refactoring notification service to use new service registry hooks
parents 0cbeb56a 2cc855a1
...@@ -17,7 +17,6 @@ import ( ...@@ -17,7 +17,6 @@ import (
"github.com/grafana/grafana/pkg/middleware" "github.com/grafana/grafana/pkg/middleware"
"github.com/grafana/grafana/pkg/registry" "github.com/grafana/grafana/pkg/registry"
"github.com/grafana/grafana/pkg/services/dashboards" "github.com/grafana/grafana/pkg/services/dashboards"
"github.com/grafana/grafana/pkg/services/notifications"
"github.com/grafana/grafana/pkg/services/provisioning" "github.com/grafana/grafana/pkg/services/provisioning"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
...@@ -32,10 +31,12 @@ import ( ...@@ -32,10 +31,12 @@ import (
"github.com/grafana/grafana/pkg/social" "github.com/grafana/grafana/pkg/social"
"github.com/grafana/grafana/pkg/tracing" "github.com/grafana/grafana/pkg/tracing"
// self registering services
_ "github.com/grafana/grafana/pkg/extensions" _ "github.com/grafana/grafana/pkg/extensions"
_ "github.com/grafana/grafana/pkg/plugins" _ "github.com/grafana/grafana/pkg/plugins"
_ "github.com/grafana/grafana/pkg/services/alerting" _ "github.com/grafana/grafana/pkg/services/alerting"
_ "github.com/grafana/grafana/pkg/services/cleanup" _ "github.com/grafana/grafana/pkg/services/cleanup"
_ "github.com/grafana/grafana/pkg/services/notifications"
_ "github.com/grafana/grafana/pkg/services/search" _ "github.com/grafana/grafana/pkg/services/search"
) )
...@@ -56,9 +57,9 @@ type GrafanaServerImpl struct { ...@@ -56,9 +57,9 @@ type GrafanaServerImpl struct {
shutdownFn context.CancelFunc shutdownFn context.CancelFunc
childRoutines *errgroup.Group childRoutines *errgroup.Group
log log.Logger log log.Logger
RouteRegister api.RouteRegister `inject:""`
HttpServer *api.HTTPServer `inject:""` RouteRegister api.RouteRegister `inject:""`
HttpServer *api.HTTPServer `inject:""`
} }
func (g *GrafanaServerImpl) Start() error { func (g *GrafanaServerImpl) Start() error {
...@@ -83,10 +84,6 @@ func (g *GrafanaServerImpl) Start() error { ...@@ -83,10 +84,6 @@ func (g *GrafanaServerImpl) Start() error {
} }
defer tracingCloser.Close() defer tracingCloser.Close()
if err = notifications.Init(); err != nil {
return fmt.Errorf("Notification service failed to initialize. error: %v", err)
}
serviceGraph := inject.Graph{} serviceGraph := inject.Graph{}
serviceGraph.Provide(&inject.Object{Value: bus.GetBus()}) serviceGraph.Provide(&inject.Object{Value: bus.GetBus()})
serviceGraph.Provide(&inject.Object{Value: dashboards.NewProvisioningService()}) serviceGraph.Provide(&inject.Object{Value: dashboards.NewProvisioningService()})
......
...@@ -11,44 +11,12 @@ import ( ...@@ -11,44 +11,12 @@ import (
"html/template" "html/template"
"net" "net"
"strconv" "strconv"
"strings"
"github.com/grafana/grafana/pkg/log"
m "github.com/grafana/grafana/pkg/models" m "github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/setting"
gomail "gopkg.in/mail.v2" gomail "gopkg.in/mail.v2"
) )
var mailQueue chan *Message
func initMailQueue() {
mailQueue = make(chan *Message, 10)
go processMailQueue()
}
func processMailQueue() {
for {
select {
case msg := <-mailQueue:
num, err := send(msg)
tos := strings.Join(msg.To, "; ")
info := ""
if err != nil {
if len(msg.Info) > 0 {
info = ", info: " + msg.Info
}
log.Error(4, fmt.Sprintf("Async sent email %d succeed, not send emails: %s%s err: %s", num, tos, info, err))
} else {
log.Trace(fmt.Sprintf("Async sent email %d succeed, sent emails: %s%s", num, tos, info))
}
}
}
}
var addToMailQueue = func(msg *Message) {
mailQueue <- msg
}
func send(msg *Message) (int, error) { func send(msg *Message) (int, error) {
dialer, err := createDialer() dialer, err := createDialer()
if err != nil { if err != nil {
......
...@@ -7,11 +7,13 @@ import ( ...@@ -7,11 +7,13 @@ import (
"html/template" "html/template"
"net/url" "net/url"
"path/filepath" "path/filepath"
"strings"
"github.com/grafana/grafana/pkg/bus" "github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/events" "github.com/grafana/grafana/pkg/events"
"github.com/grafana/grafana/pkg/log" "github.com/grafana/grafana/pkg/log"
m "github.com/grafana/grafana/pkg/models" m "github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/registry"
"github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/util" "github.com/grafana/grafana/pkg/util"
) )
...@@ -21,20 +23,31 @@ var tmplResetPassword = "reset_password.html" ...@@ -21,20 +23,31 @@ var tmplResetPassword = "reset_password.html"
var tmplSignUpStarted = "signup_started.html" var tmplSignUpStarted = "signup_started.html"
var tmplWelcomeOnSignUp = "welcome_on_signup.html" var tmplWelcomeOnSignUp = "welcome_on_signup.html"
func Init() error { func init() {
initMailQueue() registry.RegisterService(&NotificationService{})
initWebhookQueue() }
type NotificationService struct {
Bus bus.Bus `inject:""`
mailQueue chan *Message
webhookQueue chan *Webhook
log log.Logger
}
bus.AddHandler("email", sendResetPasswordEmail) func (ns *NotificationService) Init() error {
bus.AddHandler("email", validateResetPasswordCode) ns.log = log.New("notifications")
bus.AddHandler("email", sendEmailCommandHandler) ns.mailQueue = make(chan *Message, 10)
ns.webhookQueue = make(chan *Webhook, 10)
bus.AddCtxHandler("email", sendEmailCommandHandlerSync) ns.Bus.AddHandler(ns.sendResetPasswordEmail)
ns.Bus.AddHandler(ns.validateResetPasswordCode)
ns.Bus.AddHandler(ns.sendEmailCommandHandler)
bus.AddCtxHandler("webhook", SendWebhookSync) ns.Bus.AddCtxHandler(ns.sendEmailCommandHandlerSync)
ns.Bus.AddCtxHandler(ns.SendWebhookSync)
bus.AddEventListener(signUpStartedHandler) ns.Bus.AddEventListener(ns.signUpStartedHandler)
bus.AddEventListener(signUpCompletedHandler) ns.Bus.AddEventListener(ns.signUpCompletedHandler)
mailTemplates = template.New("name") mailTemplates = template.New("name")
mailTemplates.Funcs(template.FuncMap{ mailTemplates.Funcs(template.FuncMap{
...@@ -58,8 +71,37 @@ func Init() error { ...@@ -58,8 +71,37 @@ func Init() error {
return nil return nil
} }
func SendWebhookSync(ctx context.Context, cmd *m.SendWebhookSync) error { func (ns *NotificationService) Run(ctx context.Context) error {
return sendWebRequestSync(ctx, &Webhook{ for {
select {
case webhook := <-ns.webhookQueue:
err := ns.sendWebRequestSync(context.Background(), webhook)
if err != nil {
ns.log.Error("Failed to send webrequest ", "error", err)
}
case msg := <-ns.mailQueue:
num, err := send(msg)
tos := strings.Join(msg.To, "; ")
info := ""
if err != nil {
if len(msg.Info) > 0 {
info = ", info: " + msg.Info
}
ns.log.Error(fmt.Sprintf("Async sent email %d succeed, not send emails: %s%s err: %s", num, tos, info, err))
} else {
ns.log.Debug(fmt.Sprintf("Async sent email %d succeed, sent emails: %s%s", num, tos, info))
}
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}
func (ns *NotificationService) SendWebhookSync(ctx context.Context, cmd *m.SendWebhookSync) error {
return ns.sendWebRequestSync(ctx, &Webhook{
Url: cmd.Url, Url: cmd.Url,
User: cmd.User, User: cmd.User,
Password: cmd.Password, Password: cmd.Password,
...@@ -74,7 +116,7 @@ func subjectTemplateFunc(obj map[string]interface{}, value string) string { ...@@ -74,7 +116,7 @@ func subjectTemplateFunc(obj map[string]interface{}, value string) string {
return "" return ""
} }
func sendEmailCommandHandlerSync(ctx context.Context, cmd *m.SendEmailCommandSync) error { func (ns *NotificationService) sendEmailCommandHandlerSync(ctx context.Context, cmd *m.SendEmailCommandSync) error {
message, err := buildEmailMessage(&m.SendEmailCommand{ message, err := buildEmailMessage(&m.SendEmailCommand{
Data: cmd.Data, Data: cmd.Data,
Info: cmd.Info, Info: cmd.Info,
...@@ -89,24 +131,22 @@ func sendEmailCommandHandlerSync(ctx context.Context, cmd *m.SendEmailCommandSyn ...@@ -89,24 +131,22 @@ func sendEmailCommandHandlerSync(ctx context.Context, cmd *m.SendEmailCommandSyn
} }
_, err = send(message) _, err = send(message)
return err return err
} }
func sendEmailCommandHandler(cmd *m.SendEmailCommand) error { func (ns *NotificationService) sendEmailCommandHandler(cmd *m.SendEmailCommand) error {
message, err := buildEmailMessage(cmd) message, err := buildEmailMessage(cmd)
if err != nil { if err != nil {
return err return err
} }
addToMailQueue(message) ns.mailQueue <- message
return nil return nil
} }
func sendResetPasswordEmail(cmd *m.SendResetPasswordEmailCommand) error { func (ns *NotificationService) sendResetPasswordEmail(cmd *m.SendResetPasswordEmailCommand) error {
return sendEmailCommandHandler(&m.SendEmailCommand{ return ns.sendEmailCommandHandler(&m.SendEmailCommand{
To: []string{cmd.User.Email}, To: []string{cmd.User.Email},
Template: tmplResetPassword, Template: tmplResetPassword,
Data: map[string]interface{}{ Data: map[string]interface{}{
...@@ -116,7 +156,7 @@ func sendResetPasswordEmail(cmd *m.SendResetPasswordEmailCommand) error { ...@@ -116,7 +156,7 @@ func sendResetPasswordEmail(cmd *m.SendResetPasswordEmailCommand) error {
}) })
} }
func validateResetPasswordCode(query *m.ValidateResetPasswordCodeQuery) error { func (ns *NotificationService) validateResetPasswordCode(query *m.ValidateResetPasswordCodeQuery) error {
login := getLoginForEmailCode(query.Code) login := getLoginForEmailCode(query.Code)
if login == "" { if login == "" {
return m.ErrInvalidEmailCode return m.ErrInvalidEmailCode
...@@ -135,18 +175,18 @@ func validateResetPasswordCode(query *m.ValidateResetPasswordCodeQuery) error { ...@@ -135,18 +175,18 @@ func validateResetPasswordCode(query *m.ValidateResetPasswordCodeQuery) error {
return nil return nil
} }
func signUpStartedHandler(evt *events.SignUpStarted) error { func (ns *NotificationService) signUpStartedHandler(evt *events.SignUpStarted) error {
if !setting.VerifyEmailEnabled { if !setting.VerifyEmailEnabled {
return nil return nil
} }
log.Info("User signup started: %s", evt.Email) ns.log.Info("User signup started", "email", evt.Email)
if evt.Email == "" { if evt.Email == "" {
return nil return nil
} }
err := sendEmailCommandHandler(&m.SendEmailCommand{ err := ns.sendEmailCommandHandler(&m.SendEmailCommand{
To: []string{evt.Email}, To: []string{evt.Email},
Template: tmplSignUpStarted, Template: tmplSignUpStarted,
Data: map[string]interface{}{ Data: map[string]interface{}{
...@@ -155,6 +195,7 @@ func signUpStartedHandler(evt *events.SignUpStarted) error { ...@@ -155,6 +195,7 @@ func signUpStartedHandler(evt *events.SignUpStarted) error {
"SignUpUrl": setting.ToAbsUrl(fmt.Sprintf("signup/?email=%s&code=%s", url.QueryEscape(evt.Email), url.QueryEscape(evt.Code))), "SignUpUrl": setting.ToAbsUrl(fmt.Sprintf("signup/?email=%s&code=%s", url.QueryEscape(evt.Email), url.QueryEscape(evt.Code))),
}, },
}) })
if err != nil { if err != nil {
return err return err
} }
...@@ -163,12 +204,12 @@ func signUpStartedHandler(evt *events.SignUpStarted) error { ...@@ -163,12 +204,12 @@ func signUpStartedHandler(evt *events.SignUpStarted) error {
return bus.Dispatch(&emailSentCmd) return bus.Dispatch(&emailSentCmd)
} }
func signUpCompletedHandler(evt *events.SignUpCompleted) error { func (ns *NotificationService) signUpCompletedHandler(evt *events.SignUpCompleted) error {
if evt.Email == "" || !setting.Smtp.SendWelcomeEmailOnSignUp { if evt.Email == "" || !setting.Smtp.SendWelcomeEmailOnSignUp {
return nil return nil
} }
return sendEmailCommandHandler(&m.SendEmailCommand{ return ns.sendEmailCommandHandler(&m.SendEmailCommand{
To: []string{evt.Email}, To: []string{evt.Email},
Template: tmplWelcomeOnSignUp, Template: tmplWelcomeOnSignUp,
Data: map[string]interface{}{ Data: map[string]interface{}{
......
...@@ -3,6 +3,7 @@ package notifications ...@@ -3,6 +3,7 @@ package notifications
import ( import (
"testing" "testing"
"github.com/grafana/grafana/pkg/bus"
m "github.com/grafana/grafana/pkg/models" m "github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/setting"
. "github.com/smartystreets/goconvey/convey" . "github.com/smartystreets/goconvey/convey"
...@@ -17,25 +18,23 @@ type testTriggeredAlert struct { ...@@ -17,25 +18,23 @@ type testTriggeredAlert struct {
func TestNotifications(t *testing.T) { func TestNotifications(t *testing.T) {
Convey("Given the notifications service", t, func() { Convey("Given the notifications service", t, func() {
//bus.ClearBusHandlers()
setting.StaticRootPath = "../../../public/" setting.StaticRootPath = "../../../public/"
setting.Smtp.Enabled = true setting.Smtp.Enabled = true
setting.Smtp.TemplatesPattern = "emails/*.html" setting.Smtp.TemplatesPattern = "emails/*.html"
setting.Smtp.FromAddress = "from@address.com" setting.Smtp.FromAddress = "from@address.com"
setting.Smtp.FromName = "Grafana Admin" setting.Smtp.FromName = "Grafana Admin"
err := Init() ns := &NotificationService{}
So(err, ShouldBeNil) ns.Bus = bus.New()
var sentMsg *Message err := ns.Init()
addToMailQueue = func(msg *Message) { So(err, ShouldBeNil)
sentMsg = msg
}
Convey("When sending reset email password", func() { Convey("When sending reset email password", func() {
err := sendResetPasswordEmail(&m.SendResetPasswordEmailCommand{User: &m.User{Email: "asd@asd.com"}}) err := ns.sendResetPasswordEmail(&m.SendResetPasswordEmailCommand{User: &m.User{Email: "asd@asd.com"}})
So(err, ShouldBeNil) So(err, ShouldBeNil)
sentMsg := <-ns.mailQueue
So(sentMsg.Body, ShouldContainSubstring, "body") So(sentMsg.Body, ShouldContainSubstring, "body")
So(sentMsg.Subject, ShouldEqual, "Reset your Grafana password - asd@asd.com") So(sentMsg.Subject, ShouldEqual, "Reset your Grafana password - asd@asd.com")
So(sentMsg.Body, ShouldNotContainSubstring, "Subject") So(sentMsg.Body, ShouldNotContainSubstring, "Subject")
......
...@@ -12,8 +12,6 @@ import ( ...@@ -12,8 +12,6 @@ import (
func TestEmailIntegrationTest(t *testing.T) { func TestEmailIntegrationTest(t *testing.T) {
SkipConvey("Given the notifications service", t, func() { SkipConvey("Given the notifications service", t, func() {
bus.ClearBusHandlers()
setting.StaticRootPath = "../../../public/" setting.StaticRootPath = "../../../public/"
setting.Smtp.Enabled = true setting.Smtp.Enabled = true
setting.Smtp.TemplatesPattern = "emails/*.html" setting.Smtp.TemplatesPattern = "emails/*.html"
...@@ -21,14 +19,11 @@ func TestEmailIntegrationTest(t *testing.T) { ...@@ -21,14 +19,11 @@ func TestEmailIntegrationTest(t *testing.T) {
setting.Smtp.FromName = "Grafana Admin" setting.Smtp.FromName = "Grafana Admin"
setting.BuildVersion = "4.0.0" setting.BuildVersion = "4.0.0"
err := Init() ns := &NotificationService{}
So(err, ShouldBeNil) ns.Bus = bus.New()
addToMailQueue = func(msg *Message) { err := ns.Init()
So(msg.From, ShouldEqual, "Grafana Admin <from@address.com>") So(err, ShouldBeNil)
So(msg.To[0], ShouldEqual, "asdf@asdf.com")
ioutil.WriteFile("../../../tmp/test_email.html", []byte(msg.Body), 0777)
}
Convey("When sending reset email password", func() { Convey("When sending reset email password", func() {
cmd := &m.SendEmailCommand{ cmd := &m.SendEmailCommand{
...@@ -59,8 +54,13 @@ func TestEmailIntegrationTest(t *testing.T) { ...@@ -59,8 +54,13 @@ func TestEmailIntegrationTest(t *testing.T) {
Template: "alert_notification.html", Template: "alert_notification.html",
} }
err := sendEmailCommandHandler(cmd) err := ns.sendEmailCommandHandler(cmd)
So(err, ShouldBeNil) So(err, ShouldBeNil)
sentMsg := <-ns.mailQueue
So(sentMsg.From, ShouldEqual, "Grafana Admin <from@address.com>")
So(sentMsg.To[0], ShouldEqual, "asdf@asdf.com")
ioutil.WriteFile("../../../tmp/test_email.html", []byte(sentMsg.Body), 0777)
}) })
}) })
} }
...@@ -11,7 +11,6 @@ import ( ...@@ -11,7 +11,6 @@ import (
"golang.org/x/net/context/ctxhttp" "golang.org/x/net/context/ctxhttp"
"github.com/grafana/grafana/pkg/log"
"github.com/grafana/grafana/pkg/util" "github.com/grafana/grafana/pkg/util"
) )
...@@ -37,32 +36,8 @@ var netClient = &http.Client{ ...@@ -37,32 +36,8 @@ var netClient = &http.Client{
Transport: netTransport, Transport: netTransport,
} }
var ( func (ns *NotificationService) sendWebRequestSync(ctx context.Context, webhook *Webhook) error {
webhookQueue chan *Webhook ns.log.Debug("Sending webhook", "url", webhook.Url, "http method", webhook.HttpMethod)
webhookLog log.Logger
)
func initWebhookQueue() {
webhookLog = log.New("notifications.webhook")
webhookQueue = make(chan *Webhook, 10)
go processWebhookQueue()
}
func processWebhookQueue() {
for {
select {
case webhook := <-webhookQueue:
err := sendWebRequestSync(context.Background(), webhook)
if err != nil {
webhookLog.Error("Failed to send webrequest ", "error", err)
}
}
}
}
func sendWebRequestSync(ctx context.Context, webhook *Webhook) error {
webhookLog.Debug("Sending webhook", "url", webhook.Url, "http method", webhook.HttpMethod)
if webhook.HttpMethod == "" { if webhook.HttpMethod == "" {
webhook.HttpMethod = http.MethodPost webhook.HttpMethod = http.MethodPost
...@@ -98,6 +73,6 @@ func sendWebRequestSync(ctx context.Context, webhook *Webhook) error { ...@@ -98,6 +73,6 @@ func sendWebRequestSync(ctx context.Context, webhook *Webhook) error {
return err return err
} }
webhookLog.Debug("Webhook failed", "statuscode", resp.Status, "body", string(body)) ns.log.Debug("Webhook failed", "statuscode", resp.Status, "body", string(body))
return fmt.Errorf("Webhook response status %v", resp.Status) return fmt.Errorf("Webhook response status %v", resp.Status)
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment