Commit 15ce4746 by bergquist

wip

parent d0932442
......@@ -96,9 +96,7 @@ type AlertNotificationState struct {
}
type SetAlertNotificationStateToPendingCommand struct {
Id int64
SentAt int64
Version int64
State *AlertNotificationState
}
type SetAlertNotificationStateToCompleteCommand struct {
......
......@@ -3,6 +3,8 @@ package alerting
import (
"context"
"time"
"github.com/grafana/grafana/pkg/models"
)
type EvalHandler interface {
......@@ -20,7 +22,7 @@ type Notifier interface {
NeedsImage() bool
// ShouldNotify checks this evaluation should send an alert notification
ShouldNotify(ctx context.Context, evalContext *EvalContext) bool
ShouldNotify(ctx context.Context, evalContext *EvalContext, notificationState *models.AlertNotificationState) bool
GetNotifierId() int64
GetIsDefault() bool
......@@ -28,11 +30,16 @@ type Notifier interface {
GetFrequency() time.Duration
}
type NotifierSlice []Notifier
type NotifierState struct {
notifier Notifier
state *models.AlertNotificationState
}
type NotifierStateSlice []*NotifierState
func (notifiers NotifierSlice) ShouldUploadImage() bool {
for _, notifier := range notifiers {
if notifier.NeedsImage() {
func (notifiers NotifierStateSlice) ShouldUploadImage() bool {
for _, ns := range notifiers {
if ns.notifier.NeedsImage() {
return true
}
}
......
package alerting
import (
"context"
"errors"
"fmt"
......@@ -39,64 +38,59 @@ type notificationService struct {
}
func (n *notificationService) SendIfNeeded(context *EvalContext) error {
notifiers, err := n.getNeededNotifiers(context.Rule.OrgId, context.Rule.Notifications, context)
notifierStates, err := n.getNeededNotifiers(context.Rule.OrgId, context.Rule.Notifications, context)
if err != nil {
return err
}
if len(notifiers) == 0 {
if len(notifierStates) == 0 {
return nil
}
if notifiers.ShouldUploadImage() {
if notifierStates.ShouldUploadImage() {
if err = n.uploadImage(context); err != nil {
n.log.Error("Failed to upload alert panel image.", "error", err)
}
}
return n.sendNotifications(context, notifiers)
}
func (n *notificationService) sendNotifications(evalContext *EvalContext, notifiers []Notifier) error {
for _, notifier := range notifiers {
not := notifier
err := bus.InTransaction(evalContext.Ctx, func(ctx context.Context) error {
n.log.Debug("trying to send notification", "id", not.GetNotifierId())
// get alert notification (version = 1)
// phantomjs 15 sek
// loopa notifier - ge mig ett lås! where version = 1
// send notification
// Släpp lås
//
// insert if needed
return n.sendNotifications(context, notifierStates)
}
// Verify that we can send the notification again
// but this time within the same transaction.
// if !evalContext.IsTestRun && !not.ShouldNotify(ctx, evalContext) {
// return nil
// }
func (n *notificationService) sendAndMarkAsComplete(evalContext *EvalContext, notifierState *NotifierState) error {
return nil
}
// n.log.Debug("Sending notification", "type", not.GetType(), "id", not.GetNotifierId(), "isDefault", not.GetIsDefault())
// metrics.M_Alerting_Notification_Sent.WithLabelValues(not.GetType()).Inc()
func (n *notificationService) sendNotification(evalContext *EvalContext, notifierState *NotifierState) error {
n.log.Debug("trying to send notification", "id", notifierState.notifier.GetNotifierId())
// //send notification
// // success := not.Notify(evalContext) == nil
setPendingCmd := &m.SetAlertNotificationStateToPendingCommand{
State: notifierState.state,
}
// if evalContext.IsTestRun {
// return nil
// }
err := bus.DispatchCtx(evalContext.Ctx, setPendingCmd)
if err == m.ErrAlertNotificationStateVersionConflict {
return nil
}
//write result to db.
// cmd := &m.RecordNotificationJournalCommand{
// OrgId: evalContext.Rule.OrgId,
// AlertId: evalContext.Rule.Id,
// NotifierId: not.GetNotifierId(),
// SentAt: time.Now().Unix(),
// Success: success,
// }
if err != nil {
return err
}
// return bus.DispatchCtx(ctx, cmd)
return nil
})
return n.sendAndMarkAsComplete(evalContext, notifierState)
}
func (n *notificationService) sendNotifications(evalContext *EvalContext, notifierStates NotifierStateSlice) error {
for _, notifierState := range notifierStates {
err := n.sendNotification(evalContext, notifierState)
if err != nil {
n.log.Error("failed to send notification", "id", not.GetNotifierId())
n.log.Error("failed to send notification", "id", notifierState.notifier.GetNotifierId())
}
}
......@@ -143,22 +137,38 @@ func (n *notificationService) uploadImage(context *EvalContext) (err error) {
return nil
}
func (n *notificationService) getNeededNotifiers(orgId int64, notificationIds []int64, evalContext *EvalContext) (NotifierSlice, error) {
func (n *notificationService) getNeededNotifiers(orgId int64, notificationIds []int64, evalContext *EvalContext) (NotifierStateSlice, error) {
query := &m.GetAlertNotificationsToSendQuery{OrgId: orgId, Ids: notificationIds}
if err := bus.Dispatch(query); err != nil {
return nil, err
}
var result []Notifier
var result NotifierStateSlice
for _, notification := range query.Result {
not, err := n.createNotifierFor(notification)
if err != nil {
return nil, err
n.log.Error("Could not create notifier", "notifier", notification.Id)
continue
}
query := &m.GetNotificationStateQuery{
NotifierId: notification.Id,
AlertId: evalContext.Rule.Id,
OrgId: evalContext.Rule.OrgId,
}
err = bus.DispatchCtx(evalContext.Ctx, query)
if err != nil {
n.log.Error("Could not get notification state.", "notifier", notification.Id)
continue
}
if not.ShouldNotify(evalContext.Ctx, evalContext) {
result = append(result, not)
if not.ShouldNotify(evalContext.Ctx, evalContext, query.Result) {
result = append(result, &NotifierState{
notifier: not,
state: query.Result,
})
}
}
......
......@@ -48,15 +48,6 @@ func defaultShouldNotify(context *alerting.EvalContext, sendReminder bool, frequ
return false
}
// get last successfully sent notification
// lastNotify := time.Time{}
// for _, j := range journals {
// if j.Success {
// lastNotify = time.Unix(j.SentAt, 0)
// break
// }
// }
// Do not notify if interval has not elapsed
lastNotify := time.Unix(notificationState.SentAt, 0)
if sendReminder && !lastNotify.IsZero() && lastNotify.Add(frequency).After(time.Now()) {
......@@ -77,7 +68,7 @@ func defaultShouldNotify(context *alerting.EvalContext, sendReminder bool, frequ
}
// ShouldNotify checks this evaluation should send an alert notification
func (n *NotifierBase) ShouldNotify(ctx context.Context, c *alerting.EvalContext) bool {
func (n *NotifierBase) ShouldNotify(ctx context.Context, c *alerting.EvalContext, notiferState *models.AlertNotificationState) bool {
cmd := &models.GetNotificationStateQuery{
OrgId: c.Rule.OrgId,
AlertId: c.Rule.Id,
......
......@@ -283,7 +283,7 @@ func SetAlertNotificationStateToPendingCommand(ctx context.Context, cmd *m.SetAl
id = ? AND
version = ?`
res, err := sess.Exec(sql, m.AlertNotificationStatePending, cmd.Version+1, cmd.Id, cmd.Version)
res, err := sess.Exec(sql, m.AlertNotificationStatePending, cmd.State.Version+1, cmd.State.Id, cmd.State.Version)
if err != nil {
return err
}
......@@ -308,11 +308,41 @@ func GetAlertNotificationState(ctx context.Context, cmd *m.GetNotificationStateQ
Where("alert_notification_state.notifier_id = ?", cmd.NotifierId).
Get(nj)
// if exists, return it, otherwise create it with default values
if err != nil {
return err
}
if !exist {
notificationState := &m.AlertNotificationState{
OrgId: cmd.OrgId,
AlertId: cmd.AlertId,
NotifierId: cmd.NotifierId,
State: "unknown",
}
_, err := sess.Insert(notificationState)
if err == nil {
return nil
}
uniqenessIndexFailureCodes := []string{
"UNIQUE constraint failed",
"pq: duplicate key value violates unique constraint",
"Error 1062: Duplicate entry ",
}
var alreadyExists bool
for _, code := range uniqenessIndexFailureCodes {
if strings.HasPrefix(err.Error(), code) {
alreadyExists = true
}
}
return err
return m.ErrAlertNotificationStateNotFound
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment