Commit c38f6ff1 by Carl Bergquist Committed by Torkel Ödegaard

Make alerting notifcations sync (#6158)

* tech(routines): move the async logic from notification to alerting notifier

* tech(notification): reduce code dupe

* fix(notification): dont touch the response unless its an error

* feat(alerting): make alerting exeuction async but flow sync

* tech(alerting): remove commented code

* tech(alerting): remove unused code

* tech(alerting): fix typo

* tech(alerting): implement Context on EvalContext

* tech(alerting): wait for all alerts to return

* feat(alerting): dont allow alert responses to cancel

* Revert "feat(alerting): dont allow alert responses to cancel"

This reverts commit 324b006c96687da18a542942f39c10c99119430c.

* feat(alerting): give alerts some time to finish before closing down
parent 36f0bf0e
package api
import (
"context"
"encoding/json"
"net/http"
......@@ -31,7 +32,7 @@ func QueryMetrics(c *middleware.Context, reqDto dtos.MetricRequest) Response {
})
}
resp, err := tsdb.HandleRequest(request)
resp, err := tsdb.HandleRequest(context.TODO(), request)
if err != nil {
return ApiError(500, "Metric request error", err)
}
......
package bus
import (
"context"
"fmt"
"reflect"
)
type HandlerFunc interface{}
type CtxHandlerFunc func()
type Msg interface{}
type Bus interface {
Dispatch(msg Msg) error
DispatchCtx(ctx context.Context, msg Msg) error
Publish(msg Msg) error
AddHandler(handler HandlerFunc)
AddCtxHandler(handler HandlerFunc)
AddEventListener(handler HandlerFunc)
AddWildcardListener(handler HandlerFunc)
}
......@@ -34,6 +38,27 @@ func New() Bus {
return bus
}
func (b *InProcBus) DispatchCtx(ctx context.Context, msg Msg) error {
var msgName = reflect.TypeOf(msg).Elem().Name()
var handler = b.handlers[msgName]
if handler == nil {
return fmt.Errorf("handler not found for %s", msgName)
}
var params = make([]reflect.Value, 2)
params[0] = reflect.ValueOf(ctx)
params[1] = reflect.ValueOf(msg)
ret := reflect.ValueOf(handler).Call(params)
err := ret[0].Interface()
if err == nil {
return nil
} else {
return err.(error)
}
}
func (b *InProcBus) Dispatch(msg Msg) error {
var msgName = reflect.TypeOf(msg).Elem().Name()
......@@ -90,6 +115,12 @@ func (b *InProcBus) AddHandler(handler HandlerFunc) {
b.handlers[queryTypeName] = handler
}
func (b *InProcBus) AddCtxHandler(handler HandlerFunc) {
handlerType := reflect.TypeOf(handler)
queryTypeName := handlerType.In(1).Elem().Name()
b.handlers[queryTypeName] = handler
}
func (b *InProcBus) AddEventListener(handler HandlerFunc) {
handlerType := reflect.TypeOf(handler)
eventName := handlerType.In(0).Elem().Name()
......@@ -106,6 +137,11 @@ func AddHandler(implName string, handler HandlerFunc) {
}
// Package level functions
func AddCtxHandler(implName string, handler HandlerFunc) {
globalBus.AddCtxHandler(handler)
}
// Package level functions
func AddEventListener(handler HandlerFunc) {
globalBus.AddEventListener(handler)
}
......@@ -118,6 +154,10 @@ func Dispatch(msg Msg) error {
return globalBus.Dispatch(msg)
}
func DispatchCtx(ctx context.Context, msg Msg) error {
return globalBus.DispatchCtx(ctx, msg)
}
func Publish(msg Msg) error {
return globalBus.Publish(msg)
}
......
......@@ -12,6 +12,10 @@ type SendEmailCommand struct {
Info string
}
type SendEmailCommandSync struct {
SendEmailCommand
}
type SendWebhook struct {
Url string
User string
......@@ -19,6 +23,13 @@ type SendWebhook struct {
Body string
}
type SendWebhookSync struct {
Url string
User string
Password string
Body string
}
type SendResetPasswordEmailCommand struct {
User *User
}
......
......@@ -82,7 +82,7 @@ func (c *QueryCondition) executeQuery(context *alerting.EvalContext, timeRange *
req := c.getRequestForAlertRule(getDsInfo.Result, timeRange)
result := make(tsdb.TimeSeriesSlice, 0)
resp, err := c.HandleRequest(req)
resp, err := c.HandleRequest(context.Context, req)
if err != nil {
return nil, fmt.Errorf("tsdb.HandleRequest() error %v", err)
}
......
package conditions
import (
"context"
"testing"
null "gopkg.in/guregu/null.v3"
......@@ -137,7 +138,7 @@ func (ctx *queryConditionTestContext) exec() {
ctx.condition = condition
condition.HandleRequest = func(req *tsdb.Request) (*tsdb.Response, error) {
condition.HandleRequest = func(context context.Context, req *tsdb.Request) (*tsdb.Response, error) {
return &tsdb.Response{
Results: map[string]*tsdb.QueryResult{
"A": {Series: ctx.series},
......
......@@ -11,7 +11,6 @@ import (
type Engine struct {
execQueue chan *Job
resultQueue chan *EvalContext
clock clock.Clock
ticker *Ticker
scheduler Scheduler
......@@ -25,7 +24,6 @@ func NewEngine() *Engine {
e := &Engine{
ticker: NewTicker(time.Now(), time.Second*0, clock.New()),
execQueue: make(chan *Job, 1000),
resultQueue: make(chan *EvalContext, 1000),
scheduler: NewScheduler(),
evalHandler: NewEvalHandler(),
ruleReader: NewRuleReader(),
......@@ -39,23 +37,17 @@ func NewEngine() *Engine {
func (e *Engine) Run(ctx context.Context) error {
e.log.Info("Initializing Alerting")
g, ctx := errgroup.WithContext(ctx)
alertGroup, ctx := errgroup.WithContext(ctx)
g.Go(func() error { return e.alertingTicker(ctx) })
g.Go(func() error { return e.execDispatcher(ctx) })
g.Go(func() error { return e.resultDispatcher(ctx) })
alertGroup.Go(func() error { return e.alertingTicker(ctx) })
alertGroup.Go(func() error { return e.runJobDispatcher(ctx) })
err := g.Wait()
err := alertGroup.Wait()
e.log.Info("Stopped Alerting", "reason", err)
return err
}
func (e *Engine) Stop() {
close(e.execQueue)
close(e.resultQueue)
}
func (e *Engine) alertingTicker(grafanaCtx context.Context) error {
defer func() {
if err := recover(); err != nil {
......@@ -81,69 +73,58 @@ func (e *Engine) alertingTicker(grafanaCtx context.Context) error {
}
}
func (e *Engine) execDispatcher(grafanaCtx context.Context) error {
func (e *Engine) runJobDispatcher(grafanaCtx context.Context) error {
dispatcherGroup, alertCtx := errgroup.WithContext(grafanaCtx)
for {
select {
case <-grafanaCtx.Done():
close(e.resultQueue)
return grafanaCtx.Err()
return dispatcherGroup.Wait()
case job := <-e.execQueue:
go e.executeJob(grafanaCtx, job)
dispatcherGroup.Go(func() error { return e.processJob(alertCtx, job) })
}
}
}
func (e *Engine) executeJob(grafanaCtx context.Context, job *Job) error {
var (
unfinishedWorkTimeout time.Duration = time.Second * 5
alertTimeout time.Duration = time.Second * 30
)
func (e *Engine) processJob(grafanaCtx context.Context, job *Job) error {
defer func() {
if err := recover(); err != nil {
e.log.Error("Execute Alert Panic", "error", err, "stack", log.Stack(1))
e.log.Error("Alert Panic", "error", err, "stack", log.Stack(1))
}
}()
done := make(chan *EvalContext, 1)
go func() {
alertCtx, cancelFn := context.WithTimeout(context.TODO(), alertTimeout)
job.Running = true
context := NewEvalContext(job.Rule)
e.evalHandler.Eval(context)
job.Running = false
done <- context
evalContext := NewEvalContext(alertCtx, job.Rule)
done := make(chan struct{})
go func() {
e.evalHandler.Eval(evalContext)
e.resultHandler.Handle(evalContext)
close(done)
}()
var err error = nil
select {
case <-grafanaCtx.Done():
return grafanaCtx.Err()
case evalContext := <-done:
e.resultQueue <- evalContext
}
return nil
}
func (e *Engine) resultDispatcher(grafanaCtx context.Context) error {
for {
select {
case <-grafanaCtx.Done():
//handle all responses before shutting down.
for result := range e.resultQueue {
e.handleResponse(result)
case <-time.After(unfinishedWorkTimeout):
cancelFn()
err = grafanaCtx.Err()
case <-done:
}
return grafanaCtx.Err()
case result := <-e.resultQueue:
e.handleResponse(result)
case <-done:
}
}
}
func (e *Engine) handleResponse(result *EvalContext) {
defer func() {
if err := recover(); err != nil {
e.log.Error("Panic in resultDispatcher", "error", err, "stack", log.Stack(1))
}
}()
e.log.Debug("Alert Rule Result", "ruleId", result.Rule.Id, "firing", result.Firing)
e.resultHandler.Handle(result)
e.log.Debug("Job Execution completed", "timeMs", evalContext.GetDurationMs(), "alertId", evalContext.Rule.Id, "name", evalContext.Rule.Name, "firing", evalContext.Firing)
job.Running = false
cancelFn()
return err
}
package alerting
import (
"context"
"fmt"
"time"
......@@ -20,14 +21,30 @@ type EvalContext struct {
StartTime time.Time
EndTime time.Time
Rule *Rule
DoneChan chan bool
CancelChan chan bool
log log.Logger
dashboardSlug string
ImagePublicUrl string
ImageOnDiskPath string
NoDataFound bool
RetryCount int
Context context.Context
}
func (evalContext *EvalContext) Deadline() (deadline time.Time, ok bool) {
return evalContext.Deadline()
}
func (evalContext *EvalContext) Done() <-chan struct{} {
return evalContext.Context.Done()
}
func (evalContext *EvalContext) Err() error {
return evalContext.Context.Err()
}
func (evalContext *EvalContext) Value(key interface{}) interface{} {
return evalContext.Context.Value(key)
}
type StateDescription struct {
......@@ -94,14 +111,13 @@ func (c *EvalContext) GetRuleUrl() (string, error) {
}
}
func NewEvalContext(rule *Rule) *EvalContext {
func NewEvalContext(alertCtx context.Context, rule *Rule) *EvalContext {
return &EvalContext{
Context: alertCtx,
StartTime: time.Now(),
Rule: rule,
Logs: make([]*ResultLogEntry, 0),
EvalMatches: make([]*EvalMatch, 0),
DoneChan: make(chan bool, 1),
CancelChan: make(chan bool, 1),
log: log.New("alerting.evalContext"),
RetryCount: 0,
}
......
package alerting
import (
"fmt"
"time"
"github.com/grafana/grafana/pkg/log"
"github.com/grafana/grafana/pkg/metrics"
)
var (
MaxRetries int = 1
)
type DefaultEvalHandler struct {
log log.Logger
alertJobTimeout time.Duration
......@@ -20,49 +15,11 @@ type DefaultEvalHandler struct {
func NewEvalHandler() *DefaultEvalHandler {
return &DefaultEvalHandler{
log: log.New("alerting.evalHandler"),
alertJobTimeout: time.Second * 15,
alertJobTimeout: time.Second * 5,
}
}
func (e *DefaultEvalHandler) Eval(context *EvalContext) {
go e.eval(context)
select {
case <-time.After(e.alertJobTimeout):
context.Error = fmt.Errorf("Execution timed out after %v", e.alertJobTimeout)
context.EndTime = time.Now()
e.log.Debug("Job Execution timeout", "alertId", context.Rule.Id, "timeout setting", e.alertJobTimeout)
e.retry(context)
case <-context.DoneChan:
e.log.Debug("Job Execution done", "timeMs", context.GetDurationMs(), "alertId", context.Rule.Id, "firing", context.Firing)
if context.Error != nil {
e.retry(context)
}
}
}
func (e *DefaultEvalHandler) retry(context *EvalContext) {
e.log.Debug("Retrying eval exeuction", "alertId", context.Rule.Id)
if context.RetryCount < MaxRetries {
context.DoneChan = make(chan bool, 1)
context.CancelChan = make(chan bool, 1)
context.RetryCount++
e.Eval(context)
}
}
func (e *DefaultEvalHandler) eval(context *EvalContext) {
defer func() {
if err := recover(); err != nil {
e.log.Error("Alerting rule eval panic", "error", err, "stack", log.Stack(1))
if panicErr, ok := err.(error); ok {
context.Error = panicErr
}
}
}()
for _, condition := range context.Rule.Conditions {
condition.Eval(context)
......@@ -80,5 +37,4 @@ func (e *DefaultEvalHandler) eval(context *EvalContext) {
context.EndTime = time.Now()
elapsedTime := context.EndTime.Sub(context.StartTime) / time.Millisecond
metrics.M_Alerting_Exeuction_Time.Update(elapsedTime)
context.DoneChan <- true
}
package alerting
import (
"context"
"testing"
. "github.com/smartystreets/goconvey/convey"
......@@ -19,25 +20,25 @@ func TestAlertingExecutor(t *testing.T) {
handler := NewEvalHandler()
Convey("Show return triggered with single passing condition", func() {
context := NewEvalContext(&Rule{
context := NewEvalContext(context.TODO(), &Rule{
Conditions: []Condition{&conditionStub{
firing: true,
}},
})
handler.eval(context)
handler.Eval(context)
So(context.Firing, ShouldEqual, true)
})
Convey("Show return false with not passing condition", func() {
context := NewEvalContext(&Rule{
context := NewEvalContext(context.TODO(), &Rule{
Conditions: []Condition{
&conditionStub{firing: true},
&conditionStub{firing: false},
},
})
handler.eval(context)
handler.Eval(context)
So(context.Firing, ShouldEqual, false)
})
})
......
package alerting
import (
"time"
)
import "time"
type EvalHandler interface {
Eval(context *EvalContext)
Eval(evalContext *EvalContext)
}
type Scheduler interface {
......@@ -14,7 +12,7 @@ type Scheduler interface {
}
type Notifier interface {
Notify(alertResult *EvalContext)
Notify(evalContext *EvalContext) error
GetType() string
NeedsImage() bool
PassesFilter(rule *Rule) bool
......
......@@ -4,6 +4,8 @@ import (
"errors"
"fmt"
"golang.org/x/sync/errgroup"
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/components/imguploader"
"github.com/grafana/grafana/pkg/components/renderer"
......@@ -33,32 +35,36 @@ func (n *RootNotifier) PassesFilter(rule *Rule) bool {
return false
}
func (n *RootNotifier) Notify(context *EvalContext) {
func (n *RootNotifier) Notify(context *EvalContext) error {
n.log.Info("Sending notifications for", "ruleId", context.Rule.Id)
notifiers, err := n.getNotifiers(context.Rule.OrgId, context.Rule.Notifications, context)
if err != nil {
n.log.Error("Failed to read notifications", "error", err)
return
return err
}
if len(notifiers) == 0 {
return
return nil
}
err = n.uploadImage(context)
if err != nil {
n.log.Error("Failed to upload alert panel image", "error", err)
return err
}
n.sendNotifications(notifiers, context)
return n.sendNotifications(context, notifiers)
}
func (n *RootNotifier) sendNotifications(notifiers []Notifier, context *EvalContext) {
func (n *RootNotifier) sendNotifications(context *EvalContext, notifiers []Notifier) error {
g, _ := errgroup.WithContext(context.Context)
for _, notifier := range notifiers {
n.log.Info("Sending notification", "firing", context.Firing, "type", notifier.GetType())
go notifier.Notify(context)
g.Go(func() error { return notifier.Notify(context) })
}
return g.Wait()
}
func (n *RootNotifier) uploadImage(context *EvalContext) (err error) {
......
......@@ -22,7 +22,7 @@ func (fn *FakeNotifier) NeedsImage() bool {
return true
}
func (fn *FakeNotifier) Notify(alertResult *EvalContext) {}
func (fn *FakeNotifier) Notify(alertResult *EvalContext) error { return nil }
func (fn *FakeNotifier) PassesFilter(rule *Rule) bool {
return fn.FakeMatchResult
......
......@@ -35,33 +35,39 @@ func NewEmailNotifier(model *m.AlertNotification) (alerting.Notifier, error) {
}, nil
}
func (this *EmailNotifier) Notify(context *alerting.EvalContext) {
func (this *EmailNotifier) Notify(evalContext *alerting.EvalContext) error {
this.log.Info("Sending alert notification to", "addresses", this.Addresses)
metrics.M_Alerting_Notification_Sent_Email.Inc(1)
ruleUrl, err := context.GetRuleUrl()
ruleUrl, err := evalContext.GetRuleUrl()
if err != nil {
this.log.Error("Failed get rule link", "error", err)
return
return err
}
cmd := &m.SendEmailCommand{
cmd := &m.SendEmailCommandSync{
SendEmailCommand: m.SendEmailCommand{
Data: map[string]interface{}{
"Title": context.GetNotificationTitle(),
"State": context.Rule.State,
"Name": context.Rule.Name,
"StateModel": context.GetStateModel(),
"Message": context.Rule.Message,
"Title": evalContext.GetNotificationTitle(),
"State": evalContext.Rule.State,
"Name": evalContext.Rule.Name,
"StateModel": evalContext.GetStateModel(),
"Message": evalContext.Rule.Message,
"RuleUrl": ruleUrl,
"ImageLink": context.ImagePublicUrl,
"ImageLink": evalContext.ImagePublicUrl,
"AlertPageUrl": setting.AppUrl + "alerting",
"EvalMatches": context.EvalMatches,
"EvalMatches": evalContext.EvalMatches,
},
To: this.Addresses,
Template: "alert_notification.html",
},
}
if err := bus.Dispatch(cmd); err != nil {
err = bus.DispatchCtx(evalContext, cmd)
if err != nil {
this.log.Error("Failed to send alert notification email", "error", err)
}
return nil
}
......@@ -35,19 +35,19 @@ type SlackNotifier struct {
log log.Logger
}
func (this *SlackNotifier) Notify(context *alerting.EvalContext) {
this.log.Info("Executing slack notification", "ruleId", context.Rule.Id, "notification", this.Name)
func (this *SlackNotifier) Notify(evalContext *alerting.EvalContext) error {
this.log.Info("Executing slack notification", "ruleId", evalContext.Rule.Id, "notification", this.Name)
metrics.M_Alerting_Notification_Sent_Slack.Inc(1)
ruleUrl, err := context.GetRuleUrl()
ruleUrl, err := evalContext.GetRuleUrl()
if err != nil {
this.log.Error("Failed get rule link", "error", err)
return
return err
}
fields := make([]map[string]interface{}, 0)
fieldLimitCount := 4
for index, evt := range context.EvalMatches {
for index, evt := range evalContext.EvalMatches {
fields = append(fields, map[string]interface{}{
"title": evt.Metric,
"value": evt.Value,
......@@ -58,44 +58,41 @@ func (this *SlackNotifier) Notify(context *alerting.EvalContext) {
}
}
if context.Error != nil {
if evalContext.Error != nil {
fields = append(fields, map[string]interface{}{
"title": "Error message",
"value": context.Error.Error(),
"value": evalContext.Error.Error(),
"short": false,
})
}
message := ""
if context.Rule.State != m.AlertStateOK { //dont add message when going back to alert state ok.
message = context.Rule.Message
if evalContext.Rule.State != m.AlertStateOK { //dont add message when going back to alert state ok.
message = evalContext.Rule.Message
}
body := map[string]interface{}{
"attachments": []map[string]interface{}{
{
"color": context.GetStateModel().Color,
"title": context.GetNotificationTitle(),
"color": evalContext.GetStateModel().Color,
"title": evalContext.GetNotificationTitle(),
"title_link": ruleUrl,
"text": message,
"fields": fields,
"image_url": context.ImagePublicUrl,
"image_url": evalContext.ImagePublicUrl,
"footer": "Grafana v" + setting.BuildVersion,
"footer_icon": "http://grafana.org/assets/img/fav32.png",
"ts": time.Now().Unix(),
//"pretext": "Optional text that appears above the attachment block",
// "author_name": "Bobby Tables",
// "author_link": "http://flickr.com/bobby/",
// "author_icon": "http://flickr.com/icons/bobby.jpg",
// "thumb_url": "http://example.com/path/to/thumb.png",
},
},
}
data, _ := json.Marshal(&body)
cmd := &m.SendWebhook{Url: this.Url, Body: string(data)}
cmd := &m.SendWebhookSync{Url: this.Url, Body: string(data)}
if err := bus.Dispatch(cmd); err != nil {
if err := bus.DispatchCtx(evalContext, cmd); err != nil {
this.log.Error("Failed to send slack notification", "error", err, "webhook", this.Name)
}
return nil
}
......@@ -36,36 +36,38 @@ type WebhookNotifier struct {
log log.Logger
}
func (this *WebhookNotifier) Notify(context *alerting.EvalContext) {
func (this *WebhookNotifier) Notify(evalContext *alerting.EvalContext) error {
this.log.Info("Sending webhook")
metrics.M_Alerting_Notification_Sent_Webhook.Inc(1)
bodyJSON := simplejson.New()
bodyJSON.Set("title", context.GetNotificationTitle())
bodyJSON.Set("ruleId", context.Rule.Id)
bodyJSON.Set("ruleName", context.Rule.Name)
bodyJSON.Set("state", context.Rule.State)
bodyJSON.Set("evalMatches", context.EvalMatches)
bodyJSON.Set("title", evalContext.GetNotificationTitle())
bodyJSON.Set("ruleId", evalContext.Rule.Id)
bodyJSON.Set("ruleName", evalContext.Rule.Name)
bodyJSON.Set("state", evalContext.Rule.State)
bodyJSON.Set("evalMatches", evalContext.EvalMatches)
ruleUrl, err := context.GetRuleUrl()
ruleUrl, err := evalContext.GetRuleUrl()
if err == nil {
bodyJSON.Set("rule_url", ruleUrl)
}
if context.ImagePublicUrl != "" {
bodyJSON.Set("image_url", context.ImagePublicUrl)
if evalContext.ImagePublicUrl != "" {
bodyJSON.Set("image_url", evalContext.ImagePublicUrl)
}
body, _ := bodyJSON.MarshalJSON()
cmd := &m.SendWebhook{
cmd := &m.SendWebhookSync{
Url: this.Url,
User: this.User,
Password: this.Password,
Body: string(body),
}
if err := bus.Dispatch(cmd); err != nil {
if err := bus.DispatchCtx(evalContext, cmd); err != nil {
this.log.Error("Failed to send webhook", "error", err, "webhook", this.Name)
}
return nil
}
......@@ -12,7 +12,7 @@ import (
)
type ResultHandler interface {
Handle(ctx *EvalContext)
Handle(evalContext *EvalContext) error
}
type DefaultResultHandler struct {
......@@ -27,36 +27,36 @@ func NewResultHandler() *DefaultResultHandler {
}
}
func (handler *DefaultResultHandler) Handle(ctx *EvalContext) {
oldState := ctx.Rule.State
func (handler *DefaultResultHandler) Handle(evalContext *EvalContext) error {
oldState := evalContext.Rule.State
exeuctionError := ""
annotationData := simplejson.New()
if ctx.Error != nil {
handler.log.Error("Alert Rule Result Error", "ruleId", ctx.Rule.Id, "error", ctx.Error)
ctx.Rule.State = m.AlertStateExecError
exeuctionError = ctx.Error.Error()
if evalContext.Error != nil {
handler.log.Error("Alert Rule Result Error", "ruleId", evalContext.Rule.Id, "error", evalContext.Error)
evalContext.Rule.State = m.AlertStateExecError
exeuctionError = evalContext.Error.Error()
annotationData.Set("errorMessage", exeuctionError)
} else if ctx.Firing {
ctx.Rule.State = m.AlertStateAlerting
annotationData = simplejson.NewFromAny(ctx.EvalMatches)
} else if evalContext.Firing {
evalContext.Rule.State = m.AlertStateAlerting
annotationData = simplejson.NewFromAny(evalContext.EvalMatches)
} else {
// handle no data case
if ctx.NoDataFound {
ctx.Rule.State = ctx.Rule.NoDataState
if evalContext.NoDataFound {
evalContext.Rule.State = evalContext.Rule.NoDataState
} else {
ctx.Rule.State = m.AlertStateOK
evalContext.Rule.State = m.AlertStateOK
}
}
countStateResult(ctx.Rule.State)
if ctx.Rule.State != oldState {
handler.log.Info("New state change", "alertId", ctx.Rule.Id, "newState", ctx.Rule.State, "oldState", oldState)
countStateResult(evalContext.Rule.State)
if evalContext.Rule.State != oldState {
handler.log.Info("New state change", "alertId", evalContext.Rule.Id, "newState", evalContext.Rule.State, "oldState", oldState)
cmd := &m.SetAlertStateCommand{
AlertId: ctx.Rule.Id,
OrgId: ctx.Rule.OrgId,
State: ctx.Rule.State,
AlertId: evalContext.Rule.Id,
OrgId: evalContext.Rule.OrgId,
State: evalContext.Rule.State,
Error: exeuctionError,
EvalData: annotationData,
}
......@@ -67,14 +67,14 @@ func (handler *DefaultResultHandler) Handle(ctx *EvalContext) {
// save annotation
item := annotations.Item{
OrgId: ctx.Rule.OrgId,
DashboardId: ctx.Rule.DashboardId,
PanelId: ctx.Rule.PanelId,
OrgId: evalContext.Rule.OrgId,
DashboardId: evalContext.Rule.DashboardId,
PanelId: evalContext.Rule.PanelId,
Type: annotations.AlertType,
AlertId: ctx.Rule.Id,
Title: ctx.Rule.Name,
Text: ctx.GetStateModel().Text,
NewState: string(ctx.Rule.State),
AlertId: evalContext.Rule.Id,
Title: evalContext.Rule.Name,
Text: evalContext.GetStateModel().Text,
NewState: string(evalContext.Rule.State),
PrevState: string(oldState),
Epoch: time.Now().Unix(),
Data: annotationData,
......@@ -85,8 +85,10 @@ func (handler *DefaultResultHandler) Handle(ctx *EvalContext) {
handler.log.Error("Failed to save annotation for new alert state", "error", err)
}
handler.notifier.Notify(ctx)
handler.notifier.Notify(evalContext)
}
return nil
}
func countStateResult(state m.AlertStateType) {
......
package alerting
import (
"context"
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/log"
......@@ -35,13 +37,12 @@ func handleNotificationTestCommand(cmd *NotificationTestCommand) error {
return err
}
notifier.sendNotifications([]Notifier{notifiers}, createTestEvalContext())
notifier.sendNotifications(createTestEvalContext(), []Notifier{notifiers})
return nil
}
func createTestEvalContext() *EvalContext {
testRule := &Rule{
DashboardId: 1,
PanelId: 1,
......@@ -50,7 +51,7 @@ func createTestEvalContext() *EvalContext {
State: m.AlertStateAlerting,
}
ctx := NewEvalContext(testRule)
ctx := NewEvalContext(context.TODO(), testRule)
ctx.ImagePublicUrl = "http://grafana.org/assets/img/blog/mixed_styles.png"
ctx.IsTestRun = true
......
package alerting
import (
"context"
"fmt"
"github.com/grafana/grafana/pkg/bus"
......@@ -48,7 +49,7 @@ func handleAlertTestCommand(cmd *AlertTestCommand) error {
func testAlertRule(rule *Rule) *EvalContext {
handler := NewEvalHandler()
context := NewEvalContext(rule)
context := NewEvalContext(context.TODO(), rule)
context.IsTestRun = true
handler.Eval(context)
......
......@@ -5,8 +5,11 @@
package notifications
import (
"bytes"
"crypto/tls"
"errors"
"fmt"
"html/template"
"net"
"net/mail"
"net/smtp"
......@@ -15,6 +18,7 @@ import (
"time"
"github.com/grafana/grafana/pkg/log"
m "github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/setting"
)
......@@ -185,3 +189,49 @@ func buildAndSend(msg *Message) (int, error) {
}
}
}
func buildEmailMessage(cmd *m.SendEmailCommand) (*Message, error) {
if !setting.Smtp.Enabled {
return nil, errors.New("Grafana mailing/smtp options not configured, contact your Grafana admin")
}
var buffer bytes.Buffer
var err error
var subjectText interface{}
data := cmd.Data
if data == nil {
data = make(map[string]interface{}, 10)
}
setDefaultTemplateData(data, nil)
err = mailTemplates.ExecuteTemplate(&buffer, cmd.Template, data)
if err != nil {
return nil, err
}
subjectData := data["Subject"].(map[string]interface{})
subjectText, hasSubject := subjectData["value"]
if !hasSubject {
return nil, errors.New(fmt.Sprintf("Missing subject in Template %s", cmd.Template))
}
subjectTmpl, err := template.New("subject").Parse(subjectText.(string))
if err != nil {
return nil, err
}
var subjectBuffer bytes.Buffer
err = subjectTmpl.ExecuteTemplate(&subjectBuffer, "subject", data)
if err != nil {
return nil, err
}
return &Message{
To: cmd.To,
From: setting.Smtp.FromAddress,
Subject: subjectBuffer.String(),
Body: buffer.String(),
}, nil
}
package notifications
import (
"bytes"
"context"
"errors"
"fmt"
"html/template"
......@@ -29,7 +29,10 @@ func Init() error {
bus.AddHandler("email", validateResetPasswordCode)
bus.AddHandler("email", sendEmailCommandHandler)
bus.AddCtxHandler("email", sendEmailCommandHandlerSync)
bus.AddHandler("webhook", sendWebhook)
bus.AddCtxHandler("webhook", SendWebhookSync)
bus.AddEventListener(signUpStartedHandler)
bus.AddEventListener(signUpCompletedHandler)
......@@ -56,6 +59,15 @@ func Init() error {
return nil
}
func SendWebhookSync(ctx context.Context, cmd *m.SendWebhookSync) error {
return sendWebRequestSync(ctx, &Webhook{
Url: cmd.Url,
User: cmd.User,
Password: cmd.Password,
Body: cmd.Body,
})
}
func sendWebhook(cmd *m.SendWebhook) error {
addToWebhookQueue(&Webhook{
Url: cmd.Url,
......@@ -72,50 +84,32 @@ func subjectTemplateFunc(obj map[string]interface{}, value string) string {
return ""
}
func sendEmailCommandHandler(cmd *m.SendEmailCommand) error {
if !setting.Smtp.Enabled {
return errors.New("Grafana mailing/smtp options not configured, contact your Grafana admin")
}
var buffer bytes.Buffer
var err error
var subjectText interface{}
data := cmd.Data
if data == nil {
data = make(map[string]interface{}, 10)
}
func sendEmailCommandHandlerSync(ctx context.Context, cmd *m.SendEmailCommandSync) error {
message, err := buildEmailMessage(&m.SendEmailCommand{
Data: cmd.Data,
Info: cmd.Info,
Massive: cmd.Massive,
Template: cmd.Template,
To: cmd.To,
})
setDefaultTemplateData(data, nil)
err = mailTemplates.ExecuteTemplate(&buffer, cmd.Template, data)
if err != nil {
return err
}
subjectData := data["Subject"].(map[string]interface{})
subjectText, hasSubject := subjectData["value"]
_, err = buildAndSend(message)
if !hasSubject {
return errors.New(fmt.Sprintf("Missing subject in Template %s", cmd.Template))
}
subjectTmpl, err := template.New("subject").Parse(subjectText.(string))
if err != nil {
return err
}
}
func sendEmailCommandHandler(cmd *m.SendEmailCommand) error {
message, err := buildEmailMessage(cmd)
var subjectBuffer bytes.Buffer
err = subjectTmpl.ExecuteTemplate(&subjectBuffer, "subject", data)
if err != nil {
return err
}
addToMailQueue(&Message{
To: cmd.To,
From: setting.Smtp.FromAddress,
Subject: subjectBuffer.String(),
Body: buffer.String(),
})
addToMailQueue(message)
return nil
}
......
......@@ -3,7 +3,6 @@ package notifications
import (
"testing"
"github.com/grafana/grafana/pkg/bus"
m "github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/setting"
. "github.com/smartystreets/goconvey/convey"
......@@ -18,7 +17,7 @@ type testTriggeredAlert struct {
func TestNotifications(t *testing.T) {
Convey("Given the notifications service", t, func() {
bus.ClearBusHandlers()
//bus.ClearBusHandlers()
setting.StaticRootPath = "../../../public/"
setting.Smtp.Enabled = true
......
......@@ -2,11 +2,14 @@ package notifications
import (
"bytes"
"context"
"fmt"
"io/ioutil"
"net/http"
"time"
"golang.org/x/net/context/ctxhttp"
"github.com/grafana/grafana/pkg/log"
"github.com/grafana/grafana/pkg/util"
)
......@@ -31,7 +34,7 @@ func processWebhookQueue() {
for {
select {
case webhook := <-webhookQueue:
err := sendWebRequest(webhook)
err := sendWebRequestSync(context.TODO(), webhook)
if err != nil {
webhookLog.Error("Failed to send webrequest ", "error", err)
......@@ -40,14 +43,14 @@ func processWebhookQueue() {
}
}
func sendWebRequest(webhook *Webhook) error {
func sendWebRequestSync(ctx context.Context, webhook *Webhook) error {
webhookLog.Debug("Sending webhook", "url", webhook.Url)
client := http.Client{
client := &http.Client{
Timeout: time.Duration(10 * time.Second),
}
request, err := http.NewRequest("POST", webhook.Url, bytes.NewReader([]byte(webhook.Body)))
request, err := http.NewRequest(http.MethodPost, webhook.Url, bytes.NewReader([]byte(webhook.Body)))
if webhook.User != "" && webhook.Password != "" {
request.Header.Add("Authorization", util.GetBasicAuthHeader(webhook.User, webhook.Password))
}
......@@ -56,22 +59,23 @@ func sendWebRequest(webhook *Webhook) error {
return err
}
resp, err := client.Do(request)
resp, err := ctxhttp.Do(ctx, client, request)
if err != nil {
return err
}
_, err = ioutil.ReadAll(resp.Body)
if err != nil {
return err
if resp.StatusCode/100 == 2 {
return nil
}
if resp.StatusCode != 200 {
return fmt.Errorf("Webhook response code %v", resp.StatusCode)
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
defer resp.Body.Close()
return nil
webhookLog.Debug("Webhook failed", "statuscode", resp.Status, "body", string(body))
return fmt.Errorf("Webhook response status %v", resp.Status)
}
var addToWebhookQueue = func(msg *Webhook) {
......
package tsdb
import "errors"
import (
"context"
"errors"
)
type Batch struct {
DataSourceId int64
......@@ -20,7 +23,7 @@ func newBatch(dsId int64, queries QuerySlice) *Batch {
}
}
func (bg *Batch) process(context *QueryContext) {
func (bg *Batch) process(ctx context.Context, queryContext *QueryContext) {
executor := getExecutorFor(bg.Queries[0].DataSource)
if executor == nil {
......@@ -32,13 +35,13 @@ func (bg *Batch) process(context *QueryContext) {
for _, query := range bg.Queries {
result.QueryResults[query.RefId] = &QueryResult{Error: result.Error}
}
context.ResultsChan <- result
queryContext.ResultsChan <- result
return
}
res := executor.Execute(bg.Queries, context)
res := executor.Execute(ctx, bg.Queries, queryContext)
bg.Done = true
context.ResultsChan <- res
queryContext.ResultsChan <- res
}
func (bg *Batch) addQuery(query *Query) {
......
package tsdb
import "context"
type Executor interface {
Execute(queries QuerySlice, context *QueryContext) *BatchResult
Execute(ctx context.Context, queries QuerySlice, context *QueryContext) *BatchResult
}
var registry map[string]GetExecutorFn
......
package tsdb
import "context"
type FakeExecutor struct {
results map[string]*QueryResult
resultsFn map[string]ResultsFn
......@@ -14,7 +16,7 @@ func NewFakeExecutor(dsInfo *DataSourceInfo) *FakeExecutor {
}
}
func (e *FakeExecutor) Execute(queries QuerySlice, context *QueryContext) *BatchResult {
func (e *FakeExecutor) Execute(ctx context.Context, queries QuerySlice, context *QueryContext) *BatchResult {
result := &BatchResult{QueryResults: make(map[string]*QueryResult)}
for _, query := range queries {
if results, has := e.results[query.RefId]; has {
......
package graphite
import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
......@@ -11,6 +12,8 @@ import (
"strings"
"time"
"golang.org/x/net/context/ctxhttp"
"github.com/grafana/grafana/pkg/log"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/tsdb"
......@@ -26,7 +29,7 @@ func NewGraphiteExecutor(dsInfo *tsdb.DataSourceInfo) tsdb.Executor {
var (
glog log.Logger
HttpClient http.Client
HttpClient *http.Client
)
func init() {
......@@ -37,13 +40,13 @@ func init() {
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
HttpClient = http.Client{
HttpClient = &http.Client{
Timeout: time.Duration(15 * time.Second),
Transport: tr,
}
}
func (e *GraphiteExecutor) Execute(queries tsdb.QuerySlice, context *tsdb.QueryContext) *tsdb.BatchResult {
func (e *GraphiteExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, context *tsdb.QueryContext) *tsdb.BatchResult {
result := &tsdb.BatchResult{}
formData := url.Values{
......@@ -66,7 +69,8 @@ func (e *GraphiteExecutor) Execute(queries tsdb.QuerySlice, context *tsdb.QueryC
result.Error = err
return result
}
res, err := HttpClient.Do(req)
res, err := ctxhttp.Do(ctx, HttpClient, req)
if err != nil {
result.Error = err
return result
......
package prometheus
import (
"context"
"fmt"
"net/http"
"regexp"
......@@ -11,7 +12,6 @@ import (
"github.com/grafana/grafana/pkg/tsdb"
"github.com/prometheus/client_golang/api/prometheus"
pmodel "github.com/prometheus/common/model"
"golang.org/x/net/context"
)
type PrometheusExecutor struct {
......@@ -45,7 +45,7 @@ func (e *PrometheusExecutor) getClient() (prometheus.QueryAPI, error) {
return prometheus.NewQueryAPI(client), nil
}
func (e *PrometheusExecutor) Execute(queries tsdb.QuerySlice, queryContext *tsdb.QueryContext) *tsdb.BatchResult {
func (e *PrometheusExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, queryContext *tsdb.QueryContext) *tsdb.BatchResult {
result := &tsdb.BatchResult{}
client, err := e.getClient()
......@@ -64,7 +64,7 @@ func (e *PrometheusExecutor) Execute(queries tsdb.QuerySlice, queryContext *tsdb
Step: query.Step,
}
value, err := client.QueryRange(context.Background(), query.Expr, timeRange)
value, err := client.QueryRange(ctx, query.Expr, timeRange)
if err != nil {
return resultWithError(result, err)
......
package tsdb
type HandleRequestFunc func(req *Request) (*Response, error)
import "context"
func HandleRequest(req *Request) (*Response, error) {
type HandleRequestFunc func(ctx context.Context, req *Request) (*Response, error)
func HandleRequest(ctx context.Context, req *Request) (*Response, error) {
context := NewQueryContext(req.Queries, req.TimeRange)
batches, err := getBatches(req)
......@@ -16,7 +18,7 @@ func HandleRequest(req *Request) (*Response, error) {
if len(batch.Depends) == 0 {
currentlyExecuting += 1
batch.Started = true
go batch.process(context)
go batch.process(ctx, context)
}
}
......@@ -46,7 +48,7 @@ func HandleRequest(req *Request) (*Response, error) {
if batch.allDependenciesAreIn(context) {
currentlyExecuting += 1
batch.Started = true
go batch.process(context)
go batch.process(ctx, context)
}
}
}
......
package testdata
import (
"context"
"github.com/grafana/grafana/pkg/log"
"github.com/grafana/grafana/pkg/tsdb"
)
......@@ -21,7 +23,7 @@ func init() {
tsdb.RegisterExecutor("grafana-testdata-datasource", NewTestDataExecutor)
}
func (e *TestDataExecutor) Execute(queries tsdb.QuerySlice, context *tsdb.QueryContext) *tsdb.BatchResult {
func (e *TestDataExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, context *tsdb.QueryContext) *tsdb.BatchResult {
result := &tsdb.BatchResult{}
result.QueryResults = make(map[string]*tsdb.QueryResult)
......
package tsdb
import (
"context"
"testing"
"time"
......@@ -62,7 +63,7 @@ func TestMetricQuery(t *testing.T) {
fakeExecutor := registerFakeExecutor()
fakeExecutor.Return("A", TimeSeriesSlice{&TimeSeries{Name: "argh"}})
res, err := HandleRequest(req)
res, err := HandleRequest(context.TODO(), req)
So(err, ShouldBeNil)
Convey("Should return query results", func() {
......@@ -83,7 +84,7 @@ func TestMetricQuery(t *testing.T) {
fakeExecutor.Return("A", TimeSeriesSlice{&TimeSeries{Name: "argh"}})
fakeExecutor.Return("B", TimeSeriesSlice{&TimeSeries{Name: "barg"}})
res, err := HandleRequest(req)
res, err := HandleRequest(context.TODO(), req)
So(err, ShouldBeNil)
Convey("Should return query results", func() {
......@@ -106,7 +107,7 @@ func TestMetricQuery(t *testing.T) {
},
}
res, err := HandleRequest(req)
res, err := HandleRequest(context.TODO(), req)
So(err, ShouldBeNil)
Convey("Should have been batched in two requests", func() {
......@@ -121,7 +122,7 @@ func TestMetricQuery(t *testing.T) {
},
}
_, err := HandleRequest(req)
_, err := HandleRequest(context.TODO(), req)
So(err, ShouldNotBeNil)
})
......@@ -152,7 +153,7 @@ func TestMetricQuery(t *testing.T) {
}}
})
res, err := HandleRequest(req)
res, err := HandleRequest(context.TODO(), req)
So(err, ShouldBeNil)
Convey("Should have been batched in two requests", func() {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment