Commit 9af809ff by Marcus Efraimsson Committed by GitHub

Merge pull request #13440 from grafana/reminder_refactoring

Transaction issues for alert reminder
parents 321c09ae 0e090187
......@@ -20,6 +20,12 @@
revision = "7677a1d7c1137cd3dd5ba7a076d0c898a1ef4520"
[[projects]]
branch = "master"
name = "github.com/VividCortex/mysqlerr"
packages = ["."]
revision = "6c6b55f8796f578c870b7e19bafb16103bc40095"
[[projects]]
name = "github.com/aws/aws-sdk-go"
packages = [
"aws",
......@@ -673,6 +679,6 @@
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
inputs-digest = "81a37e747b875cf870c1b9486fa3147e704dea7db8ba86f7cb942d3ddc01d3e3"
inputs-digest = "6e9458f912a5f0eb3430b968f1b4dbc4e3b7671b282cf4fe1573419a6d9ba0d4"
solver-name = "gps-cdcl"
solver-version = 1
......@@ -203,3 +203,7 @@ ignored = [
[[constraint]]
name = "github.com/denisenkom/go-mssqldb"
revision = "270bc3860bb94dd3a3ffd047377d746c5e276726"
[[constraint]]
name = "github.com/VividCortex/mysqlerr"
branch = "master"
......@@ -8,18 +8,33 @@ services:
volumes:
- /var/run/docker.sock:/tmp/docker.sock:ro
mysql:
db:
image: mysql
environment:
MYSQL_ROOT_PASSWORD: rootpass
MYSQL_DATABASE: grafana
MYSQL_USER: grafana
MYSQL_PASSWORD: password
ports:
- 3306
healthcheck:
test: ["CMD", "mysqladmin" ,"ping", "-h", "localhost"]
timeout: 10s
retries: 10
# db:
# image: postgres:9.3
# environment:
# POSTGRES_DATABASE: grafana
# POSTGRES_USER: grafana
# POSTGRES_PASSWORD: password
# ports:
# - 5432
# healthcheck:
# test: ["CMD-SHELL", "pg_isready -d grafana -U grafana"]
# timeout: 10s
# retries: 10
grafana:
image: grafana/grafana:dev
volumes:
......@@ -27,17 +42,23 @@ services:
environment:
- VIRTUAL_HOST=grafana.loc
- GF_SERVER_ROOT_URL=http://grafana.loc
- GF_DATABASE_TYPE=mysql
- GF_DATABASE_HOST=mysql:3306
- GF_DATABASE_NAME=grafana
- GF_DATABASE_USER=grafana
- GF_DATABASE_PASSWORD=password
- GF_DATABASE_TYPE=mysql
- GF_DATABASE_HOST=db:3306
- GF_SESSION_PROVIDER=mysql
- GF_SESSION_PROVIDER_CONFIG=grafana:password@tcp(mysql:3306)/grafana?allowNativePasswords=true
- GF_SESSION_PROVIDER_CONFIG=grafana:password@tcp(db:3306)/grafana?allowNativePasswords=true
# - GF_DATABASE_TYPE=postgres
# - GF_DATABASE_HOST=db:5432
# - GF_DATABASE_SSL_MODE=disable
# - GF_SESSION_PROVIDER=postgres
# - GF_SESSION_PROVIDER_CONFIG=user=grafana password=password host=db port=5432 dbname=grafana sslmode=disable
- GF_LOG_FILTERS=alerting.notifier:debug,alerting.notifier.slack:debug
ports:
- 3000
depends_on:
mysql:
db:
condition: service_healthy
prometheus:
......@@ -54,4 +75,4 @@ services:
# environment:
# - DATA_SOURCE_NAME=grafana:password@(mysql:3306)/
# ports:
# - 9104
\ No newline at end of file
# - 9104
......@@ -75,7 +75,7 @@ type Alert struct {
EvalData *simplejson.Json
NewStateDate time.Time
StateChanges int
StateChanges int64
Created time.Time
Updated time.Time
......@@ -156,7 +156,7 @@ type SetAlertStateCommand struct {
Error string
EvalData *simplejson.Json
Timestamp time.Time
Result Alert
}
//Queries
......
......@@ -8,8 +8,18 @@ import (
)
var (
ErrNotificationFrequencyNotFound = errors.New("Notification frequency not specified")
ErrJournalingNotFound = errors.New("alert notification journaling not found")
ErrNotificationFrequencyNotFound = errors.New("Notification frequency not specified")
ErrAlertNotificationStateNotFound = errors.New("alert notification state not found")
ErrAlertNotificationStateVersionConflict = errors.New("alert notification state update version conflict")
ErrAlertNotificationStateAlreadyExist = errors.New("alert notification state already exists.")
)
type AlertNotificationStateType string
var (
AlertNotificationStatePending = AlertNotificationStateType("pending")
AlertNotificationStateCompleted = AlertNotificationStateType("completed")
AlertNotificationStateUnknown = AlertNotificationStateType("unknown")
)
type AlertNotification struct {
......@@ -76,33 +86,34 @@ type GetAllAlertNotificationsQuery struct {
Result []*AlertNotification
}
type AlertNotificationJournal struct {
Id int64
OrgId int64
AlertId int64
NotifierId int64
SentAt int64
Success bool
type AlertNotificationState struct {
Id int64
OrgId int64
AlertId int64
NotifierId int64
State AlertNotificationStateType
Version int64
UpdatedAt int64
AlertRuleStateUpdatedVersion int64
}
type RecordNotificationJournalCommand struct {
OrgId int64
AlertId int64
NotifierId int64
SentAt int64
Success bool
}
type SetAlertNotificationStateToPendingCommand struct {
Id int64
AlertRuleStateUpdatedVersion int64
Version int64
type GetLatestNotificationQuery struct {
OrgId int64
AlertId int64
NotifierId int64
ResultVersion int64
}
Result []AlertNotificationJournal
type SetAlertNotificationStateToCompleteCommand struct {
Id int64
Version int64
}
type CleanNotificationJournalCommand struct {
type GetOrCreateNotificationStateQuery struct {
OrgId int64
AlertId int64
NotifierId int64
Result *AlertNotificationState
}
......@@ -3,6 +3,8 @@ package alerting
import (
"context"
"time"
"github.com/grafana/grafana/pkg/models"
)
type EvalHandler interface {
......@@ -20,7 +22,7 @@ type Notifier interface {
NeedsImage() bool
// ShouldNotify checks this evaluation should send an alert notification
ShouldNotify(ctx context.Context, evalContext *EvalContext) bool
ShouldNotify(ctx context.Context, evalContext *EvalContext, notificationState *models.AlertNotificationState) bool
GetNotifierId() int64
GetIsDefault() bool
......@@ -28,11 +30,16 @@ type Notifier interface {
GetFrequency() time.Duration
}
type NotifierSlice []Notifier
type notifierState struct {
notifier Notifier
state *models.AlertNotificationState
}
type notifierStateSlice []*notifierState
func (notifiers NotifierSlice) ShouldUploadImage() bool {
for _, notifier := range notifiers {
if notifier.NeedsImage() {
func (notifiers notifierStateSlice) ShouldUploadImage() bool {
for _, ns := range notifiers {
if ns.notifier.NeedsImage() {
return true
}
}
......
package alerting
import (
"context"
"errors"
"fmt"
"time"
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/components/imguploader"
......@@ -41,61 +39,78 @@ type notificationService struct {
}
func (n *notificationService) SendIfNeeded(context *EvalContext) error {
notifiers, err := n.getNeededNotifiers(context.Rule.OrgId, context.Rule.Notifications, context)
notifierStates, err := n.getNeededNotifiers(context.Rule.OrgId, context.Rule.Notifications, context)
if err != nil {
return err
}
if len(notifiers) == 0 {
if len(notifierStates) == 0 {
return nil
}
if notifiers.ShouldUploadImage() {
if notifierStates.ShouldUploadImage() {
if err = n.uploadImage(context); err != nil {
n.log.Error("Failed to upload alert panel image.", "error", err)
}
}
return n.sendNotifications(context, notifiers)
return n.sendNotifications(context, notifierStates)
}
func (n *notificationService) sendNotifications(evalContext *EvalContext, notifiers []Notifier) error {
for _, notifier := range notifiers {
not := notifier
func (n *notificationService) sendAndMarkAsComplete(evalContext *EvalContext, notifierState *notifierState) error {
notifier := notifierState.notifier
err := bus.InTransaction(evalContext.Ctx, func(ctx context.Context) error {
n.log.Debug("trying to send notification", "id", not.GetNotifierId())
n.log.Debug("Sending notification", "type", notifier.GetType(), "id", notifier.GetNotifierId(), "isDefault", notifier.GetIsDefault())
metrics.M_Alerting_Notification_Sent.WithLabelValues(notifier.GetType()).Inc()
// Verify that we can send the notification again
// but this time within the same transaction.
if !evalContext.IsTestRun && !not.ShouldNotify(ctx, evalContext) {
return nil
}
err := notifier.Notify(evalContext)
n.log.Debug("Sending notification", "type", not.GetType(), "id", not.GetNotifierId(), "isDefault", not.GetIsDefault())
metrics.M_Alerting_Notification_Sent.WithLabelValues(not.GetType()).Inc()
if err != nil {
n.log.Error("failed to send notification", "id", notifier.GetNotifierId(), "error", err)
}
//send notification
success := not.Notify(evalContext) == nil
if evalContext.IsTestRun {
return nil
}
if evalContext.IsTestRun {
return nil
}
cmd := &m.SetAlertNotificationStateToCompleteCommand{
Id: notifierState.state.Id,
Version: notifierState.state.Version,
}
//write result to db.
cmd := &m.RecordNotificationJournalCommand{
OrgId: evalContext.Rule.OrgId,
AlertId: evalContext.Rule.Id,
NotifierId: not.GetNotifierId(),
SentAt: time.Now().Unix(),
Success: success,
}
return bus.DispatchCtx(evalContext.Ctx, cmd)
}
return bus.DispatchCtx(ctx, cmd)
})
func (n *notificationService) sendNotification(evalContext *EvalContext, notifierState *notifierState) error {
if !evalContext.IsTestRun {
setPendingCmd := &m.SetAlertNotificationStateToPendingCommand{
Id: notifierState.state.Id,
Version: notifierState.state.Version,
AlertRuleStateUpdatedVersion: evalContext.Rule.StateChanges,
}
err := bus.DispatchCtx(evalContext.Ctx, setPendingCmd)
if err == m.ErrAlertNotificationStateVersionConflict {
return nil
}
if err != nil {
n.log.Error("failed to send notification", "id", not.GetNotifierId())
return err
}
// We need to update state version to be able to log
// unexpected version conflicts when marking notifications as ok
notifierState.state.Version = setPendingCmd.ResultVersion
}
return n.sendAndMarkAsComplete(evalContext, notifierState)
}
func (n *notificationService) sendNotifications(evalContext *EvalContext, notifierStates notifierStateSlice) error {
for _, notifierState := range notifierStates {
err := n.sendNotification(evalContext, notifierState)
if err != nil {
n.log.Error("failed to send notification", "id", notifierState.notifier.GetNotifierId(), "error", err)
}
}
......@@ -142,22 +157,38 @@ func (n *notificationService) uploadImage(context *EvalContext) (err error) {
return nil
}
func (n *notificationService) getNeededNotifiers(orgId int64, notificationIds []int64, evalContext *EvalContext) (NotifierSlice, error) {
func (n *notificationService) getNeededNotifiers(orgId int64, notificationIds []int64, evalContext *EvalContext) (notifierStateSlice, error) {
query := &m.GetAlertNotificationsToSendQuery{OrgId: orgId, Ids: notificationIds}
if err := bus.Dispatch(query); err != nil {
return nil, err
}
var result []Notifier
var result notifierStateSlice
for _, notification := range query.Result {
not, err := n.createNotifierFor(notification)
if err != nil {
return nil, err
n.log.Error("Could not create notifier", "notifier", notification.Id, "error", err)
continue
}
query := &m.GetOrCreateNotificationStateQuery{
NotifierId: notification.Id,
AlertId: evalContext.Rule.Id,
OrgId: evalContext.Rule.OrgId,
}
err = bus.DispatchCtx(evalContext.Ctx, query)
if err != nil {
n.log.Error("Could not get notification state.", "notifier", notification.Id, "error", err)
continue
}
if not.ShouldNotify(evalContext.Ctx, evalContext) {
result = append(result, not)
if not.ShouldNotify(evalContext.Ctx, evalContext, query.Result) {
result = append(result, &notifierState{
notifier: not,
state: query.Result,
})
}
}
......
......@@ -46,7 +46,7 @@ type AlertmanagerNotifier struct {
log log.Logger
}
func (this *AlertmanagerNotifier) ShouldNotify(ctx context.Context, evalContext *alerting.EvalContext) bool {
func (this *AlertmanagerNotifier) ShouldNotify(ctx context.Context, evalContext *alerting.EvalContext, notificationState *m.AlertNotificationState) bool {
this.log.Debug("Should notify", "ruleId", evalContext.Rule.Id, "state", evalContext.Rule.State, "previousState", evalContext.PrevAlertState)
// Do not notify when we become OK for the first time.
......
......@@ -4,7 +4,6 @@ import (
"context"
"time"
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/log"
"github.com/grafana/grafana/pkg/models"
......@@ -46,54 +45,45 @@ func NewNotifierBase(model *models.AlertNotification) NotifierBase {
}
}
func defaultShouldNotify(context *alerting.EvalContext, sendReminder bool, frequency time.Duration, journals []models.AlertNotificationJournal) bool {
// ShouldNotify checks this evaluation should send an alert notification
func (n *NotifierBase) ShouldNotify(ctx context.Context, context *alerting.EvalContext, notiferState *models.AlertNotificationState) bool {
// Only notify on state change.
if context.PrevAlertState == context.Rule.State && !sendReminder {
if context.PrevAlertState == context.Rule.State && !n.SendReminder {
return false
}
// get last successfully sent notification
lastNotify := time.Time{}
for _, j := range journals {
if j.Success {
lastNotify = time.Unix(j.SentAt, 0)
break
if context.PrevAlertState == context.Rule.State && n.SendReminder {
// Do not notify if interval has not elapsed
lastNotify := time.Unix(notiferState.UpdatedAt, 0)
if notiferState.UpdatedAt != 0 && lastNotify.Add(n.Frequency).After(time.Now()) {
return false
}
}
// Do not notify if interval has not elapsed
if sendReminder && !lastNotify.IsZero() && lastNotify.Add(frequency).After(time.Now()) {
return false
}
// Do not notify if alert state if OK or pending even on repeated notify
if sendReminder && (context.Rule.State == models.AlertStateOK || context.Rule.State == models.AlertStatePending) {
return false
// Do not notify if alert state is OK or pending even on repeated notify
if context.Rule.State == models.AlertStateOK || context.Rule.State == models.AlertStatePending {
return false
}
}
// Do not notify when we become OK for the first time.
if (context.PrevAlertState == models.AlertStatePending) && (context.Rule.State == models.AlertStateOK) {
if context.PrevAlertState == models.AlertStatePending && context.Rule.State == models.AlertStateOK {
return false
}
return true
}
// ShouldNotify checks this evaluation should send an alert notification
func (n *NotifierBase) ShouldNotify(ctx context.Context, c *alerting.EvalContext) bool {
cmd := &models.GetLatestNotificationQuery{
OrgId: c.Rule.OrgId,
AlertId: c.Rule.Id,
NotifierId: n.Id,
// Do not notify when we OK -> Pending
if context.PrevAlertState == models.AlertStateOK && context.Rule.State == models.AlertStatePending {
return false
}
err := bus.DispatchCtx(ctx, cmd)
if err != nil {
n.log.Error("Could not determine last time alert notifier fired", "Alert name", c.Rule.Name, "Error", err)
return false
// Do not notifu if state pending and it have been updated last minute
if notiferState.State == models.AlertNotificationStatePending {
lastUpdated := time.Unix(notiferState.UpdatedAt, 0)
if lastUpdated.Add(1 * time.Minute).After(time.Now()) {
return false
}
}
return defaultShouldNotify(c, n.SendReminder, n.Frequency, cmd.Result)
return true
}
func (n *NotifierBase) GetType() string {
......
......@@ -2,12 +2,9 @@ package notifiers
import (
"context"
"errors"
"testing"
"time"
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/components/simplejson"
m "github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/services/alerting"
......@@ -23,34 +20,34 @@ func TestShouldSendAlertNotification(t *testing.T) {
newState m.AlertStateType
sendReminder bool
frequency time.Duration
journals []m.AlertNotificationJournal
state *m.AlertNotificationState
expect bool
}{
{
name: "pending -> ok should not trigger an notification",
newState: m.AlertStatePending,
prevState: m.AlertStateOK,
newState: m.AlertStateOK,
prevState: m.AlertStatePending,
sendReminder: false,
journals: []m.AlertNotificationJournal{},
state: &m.AlertNotificationState{},
expect: false,
},
{
name: "ok -> alerting should trigger an notification",
newState: m.AlertStateOK,
prevState: m.AlertStateAlerting,
newState: m.AlertStateAlerting,
prevState: m.AlertStateOK,
sendReminder: false,
journals: []m.AlertNotificationJournal{},
state: &m.AlertNotificationState{},
expect: true,
},
{
name: "ok -> pending should not trigger an notification",
newState: m.AlertStateOK,
prevState: m.AlertStatePending,
newState: m.AlertStatePending,
prevState: m.AlertStateOK,
sendReminder: false,
journals: []m.AlertNotificationJournal{},
state: &m.AlertNotificationState{},
expect: false,
},
......@@ -59,100 +56,100 @@ func TestShouldSendAlertNotification(t *testing.T) {
newState: m.AlertStateOK,
prevState: m.AlertStateOK,
sendReminder: false,
journals: []m.AlertNotificationJournal{},
state: &m.AlertNotificationState{},
expect: false,
},
{
name: "ok -> alerting should trigger an notification",
name: "ok -> ok with reminder should not trigger an notification",
newState: m.AlertStateOK,
prevState: m.AlertStateAlerting,
prevState: m.AlertStateOK,
sendReminder: true,
journals: []m.AlertNotificationJournal{},
state: &m.AlertNotificationState{},
expect: false,
},
{
name: "alerting -> ok should trigger an notification",
newState: m.AlertStateOK,
prevState: m.AlertStateAlerting,
sendReminder: false,
state: &m.AlertNotificationState{},
expect: true,
},
{
name: "ok -> ok with reminder should not trigger an notification",
name: "alerting -> ok should trigger an notification when reminders enabled",
newState: m.AlertStateOK,
prevState: m.AlertStateOK,
prevState: m.AlertStateAlerting,
frequency: time.Minute * 10,
sendReminder: true,
journals: []m.AlertNotificationJournal{},
state: &m.AlertNotificationState{UpdatedAt: tnow.Add(-time.Minute).Unix()},
expect: false,
expect: true,
},
{
name: "alerting -> alerting with reminder and no journaling should trigger",
name: "alerting -> alerting with reminder and no state should trigger",
newState: m.AlertStateAlerting,
prevState: m.AlertStateAlerting,
frequency: time.Minute * 10,
sendReminder: true,
journals: []m.AlertNotificationJournal{},
state: &m.AlertNotificationState{},
expect: true,
},
{
name: "alerting -> alerting with reminder and successful recent journal event should not trigger",
name: "alerting -> alerting with reminder and last notification sent 1 minute ago should not trigger",
newState: m.AlertStateAlerting,
prevState: m.AlertStateAlerting,
frequency: time.Minute * 10,
sendReminder: true,
journals: []m.AlertNotificationJournal{
{SentAt: tnow.Add(-time.Minute).Unix(), Success: true},
},
state: &m.AlertNotificationState{UpdatedAt: tnow.Add(-time.Minute).Unix()},
expect: false,
},
{
name: "alerting -> alerting with reminder and failed recent journal event should trigger",
name: "alerting -> alerting with reminder and last notifciation sent 11 minutes ago should trigger",
newState: m.AlertStateAlerting,
prevState: m.AlertStateAlerting,
frequency: time.Minute * 10,
sendReminder: true,
expect: true,
journals: []m.AlertNotificationJournal{
{SentAt: tnow.Add(-time.Minute).Unix(), Success: false}, // recent failed notification
{SentAt: tnow.Add(-time.Hour).Unix(), Success: true}, // old successful notification
},
state: &m.AlertNotificationState{UpdatedAt: tnow.Add(-11 * time.Minute).Unix()},
expect: true,
},
{
name: "OK -> alerting with notifciation state pending and updated 30 seconds ago should not trigger",
newState: m.AlertStateAlerting,
prevState: m.AlertStateOK,
state: &m.AlertNotificationState{State: m.AlertNotificationStatePending, UpdatedAt: tnow.Add(-30 * time.Second).Unix()},
expect: false,
},
{
name: "OK -> alerting with notifciation state pending and updated 2 minutes ago should trigger",
newState: m.AlertStateAlerting,
prevState: m.AlertStateOK,
state: &m.AlertNotificationState{State: m.AlertNotificationStatePending, UpdatedAt: tnow.Add(-2 * time.Minute).Unix()},
expect: true,
},
}
for _, tc := range tcs {
evalContext := alerting.NewEvalContext(context.TODO(), &alerting.Rule{
State: tc.newState,
State: tc.prevState,
})
evalContext.Rule.State = tc.prevState
if defaultShouldNotify(evalContext, true, tc.frequency, tc.journals) != tc.expect {
evalContext.Rule.State = tc.newState
nb := &NotifierBase{SendReminder: tc.sendReminder, Frequency: tc.frequency}
if nb.ShouldNotify(evalContext.Ctx, evalContext, tc.state) != tc.expect {
t.Errorf("failed test %s.\n expected \n%+v \nto return: %v", tc.name, tc, tc.expect)
}
}
}
func TestShouldNotifyWhenNoJournalingIsFound(t *testing.T) {
Convey("base notifier", t, func() {
bus.ClearBusHandlers()
notifier := NewNotifierBase(&m.AlertNotification{
Id: 1,
Name: "name",
Type: "email",
Settings: simplejson.New(),
})
evalContext := alerting.NewEvalContext(context.TODO(), &alerting.Rule{})
Convey("should not notify query returns error", func() {
bus.AddHandlerCtx("", func(ctx context.Context, q *m.GetLatestNotificationQuery) error {
return errors.New("some kind of error unknown error")
})
if notifier.ShouldNotify(context.Background(), evalContext) {
t.Errorf("should not send notifications when query returns error")
}
})
})
}
func TestBaseNotifier(t *testing.T) {
Convey("default constructor for notifiers", t, func() {
bJson := simplejson.New()
......
......@@ -67,6 +67,12 @@ func (handler *DefaultResultHandler) Handle(evalContext *EvalContext) error {
}
handler.log.Error("Failed to save state", "error", err)
} else {
// StateChanges is used for de duping alert notifications
// when two servers are raising. This makes sure that the server
// with the last state change always sends a notification.
evalContext.Rule.StateChanges = cmd.Result.StateChanges
}
// save annotation
......@@ -88,19 +94,6 @@ func (handler *DefaultResultHandler) Handle(evalContext *EvalContext) error {
}
}
if evalContext.Rule.State == m.AlertStateOK && evalContext.PrevAlertState != m.AlertStateOK {
for _, notifierId := range evalContext.Rule.Notifications {
cmd := &m.CleanNotificationJournalCommand{
AlertId: evalContext.Rule.Id,
NotifierId: notifierId,
OrgId: evalContext.Rule.OrgId,
}
if err := bus.DispatchCtx(evalContext.Ctx, cmd); err != nil {
handler.log.Error("Failed to clean up old notification records", "notifier", notifierId, "alert", evalContext.Rule.Id, "Error", err)
}
}
}
handler.notifier.SendIfNeeded(evalContext)
return nil
}
......@@ -23,6 +23,8 @@ type Rule struct {
State m.AlertStateType
Conditions []Condition
Notifications []int64
StateChanges int64
}
type ValidationError struct {
......@@ -100,6 +102,7 @@ func NewRuleFromDBAlert(ruleDef *m.Alert) (*Rule, error) {
model.State = ruleDef.State
model.NoDataState = m.NoDataOption(ruleDef.Settings.Get("noDataState").MustString("no_data"))
model.ExecutionErrorState = m.ExecutionErrorOption(ruleDef.Settings.Get("executionErrorState").MustString("alerting"))
model.StateChanges = ruleDef.StateChanges
for _, v := range ruleDef.Settings.Get("notifications").MustArray() {
jsonModel := simplejson.NewFromAny(v)
......
......@@ -39,7 +39,7 @@ func handleNotificationTestCommand(cmd *NotificationTestCommand) error {
return err
}
return notifier.sendNotifications(createTestEvalContext(cmd), []Notifier{notifiers})
return notifier.sendNotifications(createTestEvalContext(cmd), notifierStateSlice{{notifier: notifiers}})
}
func createTestEvalContext(cmd *NotificationTestCommand) *EvalContext {
......
......@@ -60,6 +60,10 @@ func deleteAlertByIdInternal(alertId int64, reason string, sess *DBSession) erro
return err
}
if _, err := sess.Exec("DELETE FROM alert_notification_state WHERE alert_id = ?", alertId); err != nil {
return err
}
return nil
}
......@@ -275,6 +279,8 @@ func SetAlertState(cmd *m.SetAlertStateCommand) error {
}
sess.ID(alert.Id).Update(&alert)
cmd.Result = alert
return nil
})
}
......
......@@ -3,6 +3,7 @@ package sqlstore
import (
"bytes"
"context"
"errors"
"fmt"
"strings"
"time"
......@@ -18,16 +19,23 @@ func init() {
bus.AddHandler("sql", DeleteAlertNotification)
bus.AddHandler("sql", GetAlertNotificationsToSend)
bus.AddHandler("sql", GetAllAlertNotifications)
bus.AddHandlerCtx("sql", RecordNotificationJournal)
bus.AddHandlerCtx("sql", GetLatestNotification)
bus.AddHandlerCtx("sql", CleanNotificationJournal)
bus.AddHandlerCtx("sql", GetOrCreateAlertNotificationState)
bus.AddHandlerCtx("sql", SetAlertNotificationStateToCompleteCommand)
bus.AddHandlerCtx("sql", SetAlertNotificationStateToPendingCommand)
}
func DeleteAlertNotification(cmd *m.DeleteAlertNotificationCommand) error {
return inTransaction(func(sess *DBSession) error {
sql := "DELETE FROM alert_notification WHERE alert_notification.org_id = ? AND alert_notification.id = ?"
_, err := sess.Exec(sql, cmd.OrgId, cmd.Id)
return err
if _, err := sess.Exec(sql, cmd.OrgId, cmd.Id); err != nil {
return err
}
if _, err := sess.Exec("DELETE FROM alert_notification_state WHERE alert_notification_state.org_id = ? AND alert_notification_state.notifier_id = ?", cmd.OrgId, cmd.Id); err != nil {
return err
}
return nil
})
}
......@@ -229,44 +237,123 @@ func UpdateAlertNotification(cmd *m.UpdateAlertNotificationCommand) error {
})
}
func RecordNotificationJournal(ctx context.Context, cmd *m.RecordNotificationJournalCommand) error {
return withDbSession(ctx, func(sess *DBSession) error {
journalEntry := &m.AlertNotificationJournal{
OrgId: cmd.OrgId,
AlertId: cmd.AlertId,
NotifierId: cmd.NotifierId,
SentAt: cmd.SentAt,
Success: cmd.Success,
func SetAlertNotificationStateToCompleteCommand(ctx context.Context, cmd *m.SetAlertNotificationStateToCompleteCommand) error {
return inTransactionCtx(ctx, func(sess *DBSession) error {
version := cmd.Version
var current m.AlertNotificationState
sess.ID(cmd.Id).Get(&current)
newVersion := cmd.Version + 1
sql := `UPDATE alert_notification_state SET
state = ?,
version = ?,
updated_at = ?
WHERE
id = ?`
_, err := sess.Exec(sql, m.AlertNotificationStateCompleted, newVersion, timeNow().Unix(), cmd.Id)
if err != nil {
return err
}
_, err := sess.Insert(journalEntry)
return err
if current.Version != version {
sqlog.Error("notification state out of sync. the notification is marked as complete but has been modified between set as pending and completion.", "notifierId", current.NotifierId)
}
return nil
})
}
func GetLatestNotification(ctx context.Context, cmd *m.GetLatestNotificationQuery) error {
func SetAlertNotificationStateToPendingCommand(ctx context.Context, cmd *m.SetAlertNotificationStateToPendingCommand) error {
return withDbSession(ctx, func(sess *DBSession) error {
nj := []m.AlertNotificationJournal{}
err := sess.Desc("alert_notification_journal.sent_at").
Where("alert_notification_journal.org_id = ?", cmd.OrgId).
Where("alert_notification_journal.alert_id = ?", cmd.AlertId).
Where("alert_notification_journal.notifier_id = ?", cmd.NotifierId).
Find(&nj)
newVersion := cmd.Version + 1
sql := `UPDATE alert_notification_state SET
state = ?,
version = ?,
updated_at = ?,
alert_rule_state_updated_version = ?
WHERE
id = ? AND
(version = ? OR alert_rule_state_updated_version < ?)`
res, err := sess.Exec(sql,
m.AlertNotificationStatePending,
newVersion,
timeNow().Unix(),
cmd.AlertRuleStateUpdatedVersion,
cmd.Id,
cmd.Version,
cmd.AlertRuleStateUpdatedVersion)
if err != nil {
return err
}
cmd.Result = nj
affected, _ := res.RowsAffected()
if affected == 0 {
return m.ErrAlertNotificationStateVersionConflict
}
cmd.ResultVersion = newVersion
return nil
})
}
func CleanNotificationJournal(ctx context.Context, cmd *m.CleanNotificationJournalCommand) error {
func GetOrCreateAlertNotificationState(ctx context.Context, cmd *m.GetOrCreateNotificationStateQuery) error {
return inTransactionCtx(ctx, func(sess *DBSession) error {
sql := "DELETE FROM alert_notification_journal WHERE alert_notification_journal.org_id = ? AND alert_notification_journal.alert_id = ? AND alert_notification_journal.notifier_id = ?"
_, err := sess.Exec(sql, cmd.OrgId, cmd.AlertId, cmd.NotifierId)
return err
nj := &m.AlertNotificationState{}
exist, err := getAlertNotificationState(sess, cmd, nj)
// if exists, return it, otherwise create it with default values
if err != nil {
return err
}
if exist {
cmd.Result = nj
return nil
}
notificationState := &m.AlertNotificationState{
OrgId: cmd.OrgId,
AlertId: cmd.AlertId,
NotifierId: cmd.NotifierId,
State: m.AlertNotificationStateUnknown,
UpdatedAt: timeNow().Unix(),
}
if _, err := sess.Insert(notificationState); err != nil {
if dialect.IsUniqueConstraintViolation(err) {
exist, err = getAlertNotificationState(sess, cmd, nj)
if err != nil {
return err
}
if !exist {
return errors.New("Should not happen")
}
cmd.Result = nj
return nil
}
return err
}
cmd.Result = notificationState
return nil
})
}
func getAlertNotificationState(sess *DBSession, cmd *m.GetOrCreateNotificationStateQuery, nj *m.AlertNotificationState) (bool, error) {
return sess.
Where("alert_notification_state.org_id = ?", cmd.OrgId).
Where("alert_notification_state.alert_id = ?", cmd.AlertId).
Where("alert_notification_state.notifier_id = ?", cmd.NotifierId).
Get(nj)
}
......@@ -107,4 +107,27 @@ func addAlertMigrations(mg *Migrator) {
mg.AddMigration("create notification_journal table v1", NewAddTableMigration(notification_journal))
mg.AddMigration("add index notification_journal org_id & alert_id & notifier_id", NewAddIndexMigration(notification_journal, notification_journal.Indices[0]))
mg.AddMigration("drop alert_notification_journal", NewDropTableMigration("alert_notification_journal"))
alert_notification_state := Table{
Name: "alert_notification_state",
Columns: []*Column{
{Name: "id", Type: DB_BigInt, IsPrimaryKey: true, IsAutoIncrement: true},
{Name: "org_id", Type: DB_BigInt, Nullable: false},
{Name: "alert_id", Type: DB_BigInt, Nullable: false},
{Name: "notifier_id", Type: DB_BigInt, Nullable: false},
{Name: "state", Type: DB_NVarchar, Length: 50, Nullable: false},
{Name: "version", Type: DB_BigInt, Nullable: false},
{Name: "updated_at", Type: DB_BigInt, Nullable: false},
{Name: "alert_rule_state_updated_version", Type: DB_BigInt, Nullable: false},
},
Indices: []*Index{
{Cols: []string{"org_id", "alert_id", "notifier_id"}, Type: UniqueIndex},
},
}
mg.AddMigration("create alert_notification_state table v1", NewAddTableMigration(alert_notification_state))
mg.AddMigration("add index alert_notification_state org_id & alert_id & notifier_id",
NewAddIndexMigration(alert_notification_state, alert_notification_state.Indices[0]))
}
......@@ -44,6 +44,8 @@ type Dialect interface {
CleanDB() error
NoOpSql() string
IsUniqueConstraintViolation(err error) bool
}
func NewDialect(engine *xorm.Engine) Dialect {
......
......@@ -5,6 +5,8 @@ import (
"strconv"
"strings"
"github.com/VividCortex/mysqlerr"
"github.com/go-sql-driver/mysql"
"github.com/go-xorm/xorm"
)
......@@ -125,3 +127,13 @@ func (db *Mysql) CleanDB() error {
return nil
}
func (db *Mysql) IsUniqueConstraintViolation(err error) bool {
if driverErr, ok := err.(*mysql.MySQLError); ok {
if driverErr.Number == mysqlerr.ER_DUP_ENTRY {
return true
}
}
return false
}
......@@ -6,6 +6,7 @@ import (
"strings"
"github.com/go-xorm/xorm"
"github.com/lib/pq"
)
type Postgres struct {
......@@ -136,3 +137,13 @@ func (db *Postgres) CleanDB() error {
return nil
}
func (db *Postgres) IsUniqueConstraintViolation(err error) bool {
if driverErr, ok := err.(*pq.Error); ok {
if driverErr.Code == "23505" {
return true
}
}
return false
}
......@@ -4,6 +4,7 @@ import (
"fmt"
"github.com/go-xorm/xorm"
sqlite3 "github.com/mattn/go-sqlite3"
)
type Sqlite3 struct {
......@@ -82,3 +83,13 @@ func (db *Sqlite3) DropIndexSql(tableName string, index *Index) string {
func (db *Sqlite3) CleanDB() error {
return nil
}
func (db *Sqlite3) IsUniqueConstraintViolation(err error) bool {
if driverErr, ok := err.(sqlite3.Error); ok {
if driverErr.ExtendedCode == sqlite3.ErrConstraintUnique {
return true
}
}
return false
}
MIT License
Copyright (c) 2017 VividCortex
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment