Commit 7a34c129 by Torkel Ödegaard

feat(alerting): things are compiling and tests pass

parent 0cbf4ae7
......@@ -27,7 +27,7 @@ type AlertRule struct {
Updated time.Time `json:"updated"`
}
func (this *AlertRule) Equals(other AlertRule) bool {
func (this *AlertRule) Equals(other *AlertRule) bool {
result := false
result = result || this.Aggregator != other.Aggregator
......@@ -72,7 +72,7 @@ type SaveAlertsCommand struct {
UserId int64
OrgId int64
Alerts []AlertRule
Alerts []*AlertRule
}
type DeleteAlertCommand struct {
......@@ -86,23 +86,23 @@ type GetAlertsQuery struct {
DashboardId int64
PanelId int64
Result []AlertRule
Result []*AlertRule
}
type GetAllAlertsQuery struct {
Result []AlertRule
Result []*AlertRule
}
type GetAlertsForExecutionQuery struct {
Timestamp int64
Result []AlertRule
Result []*AlertRule
}
type GetAlertByIdQuery struct {
Id int64
Result AlertRule
Result *AlertRule
}
type GetAlertChangesQuery struct {
......@@ -110,5 +110,5 @@ type GetAlertChangesQuery struct {
Limit int64
SinceId int64
Result []AlertRuleChange
Result []*AlertRuleChange
}
package alerting
import (
"sync"
"time"
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/log"
m "github.com/grafana/grafana/pkg/models"
)
type RuleReader interface {
Fetch() []AlertRule
}
type AlertRuleReader struct {
sync.RWMutex
serverID string
serverPosition int
clusterSize int
}
func NewRuleReader() *AlertRuleReader {
ruleReader := &AlertRuleReader{}
go ruleReader.initReader()
return ruleReader
}
var (
alertJobs []AlertRule
)
func (arr *AlertRuleReader) Fetch() []AlertRule {
return alertJobs
}
func (arr *AlertRuleReader) initReader() {
alertJobs = make([]AlertRule, 0)
heartbeat := time.NewTicker(time.Second * 10)
arr.updateRules()
for {
select {
case <-heartbeat.C:
arr.updateRules()
}
}
}
func (arr *AlertRuleReader) updateRules() {
arr.Lock()
defer arr.Unlock()
cmd := &m.GetAllAlertsQuery{}
err := bus.Dispatch(cmd)
if err == nil {
//alertJobs = cmd.Result
} else {
log.Error(1, "AlertRuleReader: Could not load alerts")
}
}
func (arr *AlertRuleReader) heartBeat() {
//Lets cheat on this until we focus on clustering
//log.Info("Heartbeat: Sending heartbeat from " + this.serverId)
arr.clusterSize = 1
arr.serverPosition = 1
/*
cmd := &m.HeartBeatCommand{ServerId: this.serverId}
err := bus.Dispatch(cmd)
if err != nil {
log.Error(1, "Failed to send heartbeat.")
} else {
this.clusterSize = cmd.Result.ClusterSize
this.serverPosition = cmd.Result.UptimePosition
}
*/
}
......@@ -6,8 +6,8 @@ import (
m "github.com/grafana/grafana/pkg/models"
)
func ParseAlertsFromDashboard(cmd *m.SaveDashboardCommand) []m.AlertRule {
alerts := make([]m.AlertRule, 0)
func ParseAlertsFromDashboard(cmd *m.SaveDashboardCommand) []*m.AlertRule {
alerts := make([]*m.AlertRule, 0)
for _, rowObj := range cmd.Dashboard.Get("rows").MustArray() {
row := simplejson.NewFromAny(rowObj)
......@@ -16,7 +16,7 @@ func ParseAlertsFromDashboard(cmd *m.SaveDashboardCommand) []m.AlertRule {
panel := simplejson.NewFromAny(panelObj)
alerting := panel.Get("alerting")
alert := m.AlertRule{
alert := &m.AlertRule{
DashboardId: cmd.Result.Id,
OrgId: cmd.Result.OrgId,
PanelId: panel.Get("id").MustInt64(),
......
package alerting
import (
"fmt"
"time"
"github.com/Unknwon/log"
"github.com/benbjohnson/clock"
"github.com/grafana/grafana/pkg/log"
"github.com/grafana/grafana/pkg/services/alerting/alertstates"
)
type Engine struct {
......@@ -13,6 +15,8 @@ type Engine struct {
clock clock.Clock
ticker *Ticker
scheduler Scheduler
executor Executor
ruleReader RuleReader
}
func NewEngine() *Engine {
......@@ -21,24 +25,37 @@ func NewEngine() *Engine {
execQueue: make(chan *AlertJob, 1000),
resultQueue: make(chan *AlertResult, 1000),
scheduler: NewScheduler(),
executor: &ExecutorImpl{},
ruleReader: NewRuleReader(),
}
return e
}
func (e *Engine) Start() {
log.Info("Alerting: Engine.Start()")
go e.schedulerTick()
go e.execDispatch()
go e.resultHandler()
}
func (e *Engine) Stop() {
close(e.execQueue)
close(e.resultQueue)
}
func (e *Engine) schedulerTick() {
tickIndex := 0
for {
select {
case tick := <-e.ticker.C:
// update rules ever tenth tick
if tickIndex%10 == 0 {
e.scheduler.Update(e.ruleReader.Fetch())
}
e.scheduler.Tick(tick, e.execQueue)
}
}
......@@ -46,8 +63,52 @@ func (e *Engine) schedulerTick() {
func (e *Engine) execDispatch() {
for job := range e.execQueue {
log.Info("AlertEngine: Dispatching alert job %s", job.Rule.Title)
log.Trace("Alerting: Engine:execDispatch() starting job %s", job.Rule.Title)
job.Running = true
//scheduler.measureAndExecute(executor, job)
e.executeJob(job)
}
}
func (e *Engine) executeJob(job *AlertJob) {
now := time.Now()
resultChan := make(chan *AlertResult, 1)
go e.executor.Execute(job, resultChan)
select {
case <-time.After(time.Second * 5):
e.resultQueue <- &AlertResult{
Id: job.Rule.Id,
State: alertstates.Pending,
Duration: float64(time.Since(now).Nanoseconds()) / float64(1000000),
AlertJob: job,
}
case result := <-resultChan:
result.Duration = float64(time.Since(now).Nanoseconds()) / float64(1000000)
log.Trace("Alerting: engine.executeJob(): exeuction took %vms", result.Duration)
e.resultQueue <- result
}
}
func (e *Engine) resultHandler() {
for result := range e.resultQueue {
log.Debug("Alerting: engine.resultHandler(): alert(%d) status(%s) actual(%v) retry(%d)", result.Id, result.State, result.ActualValue, result.AlertJob.RetryCount)
result.AlertJob.Running = false
if result.IsResultIncomplete() {
result.AlertJob.RetryCount++
if result.AlertJob.RetryCount < maxRetries {
e.execQueue <- result.AlertJob
} else {
saveState(&AlertResult{
Id: result.Id,
State: alertstates.Critical,
Description: fmt.Sprintf("Failed to run check after %d retires", maxRetries),
})
}
} else {
result.AlertJob.RetryCount = 0
saveState(result)
}
}
}
......@@ -17,7 +17,8 @@ var (
descriptionFmt = "Actual value: %1.2f for %s"
)
type ExecutorImpl struct{}
type ExecutorImpl struct {
}
type compareFn func(float64, float64) bool
type aggregationFn func(*tsdb.TimeSeries) float64
......@@ -76,16 +77,16 @@ var aggregator = map[string]aggregationFn{
},
}
func (executor *ExecutorImpl) Execute(job *AlertJob, responseQueue chan *AlertResult) {
func (executor *ExecutorImpl) Execute(job *AlertJob, resultQueue chan *AlertResult) {
response, err := executor.GetSeries(job)
if err != nil {
responseQueue <- &AlertResult{State: alertstates.Pending, Id: job.Rule.Id, AlertJob: job}
resultQueue <- &AlertResult{State: alertstates.Pending, Id: job.Rule.Id, AlertJob: job}
}
result := executor.validateRule(job.Rule, response)
result.AlertJob = job
responseQueue <- result
resultQueue <- result
}
func (executor *ExecutorImpl) GetSeries(job *AlertJob) (tsdb.TimeSeriesSlice, error) {
......
package alerting
import (
m "github.com/grafana/grafana/pkg/models"
. "github.com/smartystreets/goconvey/convey"
"testing"
"github.com/grafana/grafana/pkg/services/alerting/alertstates"
"github.com/grafana/grafana/pkg/tsdb"
. "github.com/smartystreets/goconvey/convey"
)
func TestAlertingExecutor(t *testing.T) {
......@@ -12,95 +14,95 @@ func TestAlertingExecutor(t *testing.T) {
Convey("single time serie", func() {
Convey("Show return ok since avg is above 2", func() {
rule := m.AlertRule{CritLevel: 10, CritOperator: ">", Aggregator: "sum"}
rule := &AlertRule{CritLevel: 10, CritOperator: ">", Aggregator: "sum"}
timeSeries := []*m.TimeSeries{
m.NewTimeSeries("test1", [][2]float64{{2, 0}}),
timeSeries := []*tsdb.TimeSeries{
tsdb.NewTimeSeries("test1", [][2]float64{{2, 0}}),
}
result := executor.validateRule(rule, timeSeries)
So(result.State, ShouldEqual, m.AlertStateOk)
So(result.State, ShouldEqual, alertstates.Ok)
})
Convey("Show return critical since below 2", func() {
rule := m.AlertRule{CritLevel: 10, CritOperator: "<", Aggregator: "sum"}
rule := &AlertRule{CritLevel: 10, CritOperator: "<", Aggregator: "sum"}
timeSeries := []*m.TimeSeries{
m.NewTimeSeries("test1", [][2]float64{{2, 0}}),
timeSeries := []*tsdb.TimeSeries{
tsdb.NewTimeSeries("test1", [][2]float64{{2, 0}}),
}
result := executor.validateRule(rule, timeSeries)
So(result.State, ShouldEqual, m.AlertStateCritical)
So(result.State, ShouldEqual, alertstates.Critical)
})
Convey("Show return critical since sum is above 10", func() {
rule := m.AlertRule{CritLevel: 10, CritOperator: ">", Aggregator: "sum"}
rule := &AlertRule{CritLevel: 10, CritOperator: ">", Aggregator: "sum"}
timeSeries := []*m.TimeSeries{
m.NewTimeSeries("test1", [][2]float64{{9, 0}, {9, 0}}),
timeSeries := []*tsdb.TimeSeries{
tsdb.NewTimeSeries("test1", [][2]float64{{9, 0}, {9, 0}}),
}
result := executor.validateRule(rule, timeSeries)
So(result.State, ShouldEqual, m.AlertStateCritical)
So(result.State, ShouldEqual, alertstates.Critical)
})
Convey("Show return ok since avg is below 10", func() {
rule := m.AlertRule{CritLevel: 10, CritOperator: ">", Aggregator: "avg"}
rule := &AlertRule{CritLevel: 10, CritOperator: ">", Aggregator: "avg"}
timeSeries := []*m.TimeSeries{
m.NewTimeSeries("test1", [][2]float64{{9, 0}, {9, 0}}),
timeSeries := []*tsdb.TimeSeries{
tsdb.NewTimeSeries("test1", [][2]float64{{9, 0}, {9, 0}}),
}
result := executor.validateRule(rule, timeSeries)
So(result.State, ShouldEqual, m.AlertStateOk)
So(result.State, ShouldEqual, alertstates.Ok)
})
Convey("Show return ok since min is below 10", func() {
rule := m.AlertRule{CritLevel: 10, CritOperator: ">", Aggregator: "min"}
rule := &AlertRule{CritLevel: 10, CritOperator: ">", Aggregator: "min"}
timeSeries := []*m.TimeSeries{
m.NewTimeSeries("test1", [][2]float64{{11, 0}, {9, 0}}),
timeSeries := []*tsdb.TimeSeries{
tsdb.NewTimeSeries("test1", [][2]float64{{11, 0}, {9, 0}}),
}
result := executor.validateRule(rule, timeSeries)
So(result.State, ShouldEqual, m.AlertStateOk)
So(result.State, ShouldEqual, alertstates.Ok)
})
Convey("Show return ok since max is above 10", func() {
rule := m.AlertRule{CritLevel: 10, CritOperator: ">", Aggregator: "max"}
rule := &AlertRule{CritLevel: 10, CritOperator: ">", Aggregator: "max"}
timeSeries := []*m.TimeSeries{
m.NewTimeSeries("test1", [][2]float64{{1, 0}, {11, 0}}),
timeSeries := []*tsdb.TimeSeries{
tsdb.NewTimeSeries("test1", [][2]float64{{1, 0}, {11, 0}}),
}
result := executor.validateRule(rule, timeSeries)
So(result.State, ShouldEqual, m.AlertStateCritical)
So(result.State, ShouldEqual, alertstates.Critical)
})
})
Convey("muliple time series", func() {
Convey("both are ok", func() {
rule := m.AlertRule{CritLevel: 10, CritOperator: ">", Aggregator: "sum"}
rule := &AlertRule{CritLevel: 10, CritOperator: ">", Aggregator: "sum"}
timeSeries := []*m.TimeSeries{
m.NewTimeSeries("test1", [][2]float64{{2, 0}}),
m.NewTimeSeries("test1", [][2]float64{{2, 0}}),
timeSeries := []*tsdb.TimeSeries{
tsdb.NewTimeSeries("test1", [][2]float64{{2, 0}}),
tsdb.NewTimeSeries("test1", [][2]float64{{2, 0}}),
}
result := executor.validateRule(rule, timeSeries)
So(result.State, ShouldEqual, m.AlertStateOk)
So(result.State, ShouldEqual, alertstates.Ok)
})
Convey("first serie is good, second is critical", func() {
rule := m.AlertRule{CritLevel: 10, CritOperator: ">", Aggregator: "sum"}
rule := &AlertRule{CritLevel: 10, CritOperator: ">", Aggregator: "sum"}
timeSeries := []*m.TimeSeries{
m.NewTimeSeries("test1", [][2]float64{{2, 0}}),
m.NewTimeSeries("test1", [][2]float64{{11, 0}}),
timeSeries := []*tsdb.TimeSeries{
tsdb.NewTimeSeries("test1", [][2]float64{{2, 0}}),
tsdb.NewTimeSeries("test1", [][2]float64{{11, 0}}),
}
result := executor.validateRule(rule, timeSeries)
So(result.State, ShouldEqual, m.AlertStateCritical)
So(result.State, ShouldEqual, alertstates.Critical)
})
})
})
......
......@@ -8,5 +8,5 @@ type Executor interface {
type Scheduler interface {
Tick(time time.Time, execQueue chan *AlertJob)
Update(rules []*AlertRule)
Update(rules []AlertRule)
}
......@@ -40,4 +40,5 @@ type AlertRule struct {
Description string
QueryRange int
Aggregator string
State string
}
......@@ -16,15 +16,15 @@ func NewScheduler() Scheduler {
}
}
func (scheduler *SchedulerImpl) Update(rules []*AlertRule) {
func (s *SchedulerImpl) Update(rules []AlertRule) {
log.Debug("Scheduler: Update()")
jobs := make(map[int64]*AlertJob, 0)
for i, rule := range rules {
var job *AlertJob
if scheduler.jobs[rule.Id] != nil {
job = scheduler.jobs[rule.Id]
if s.jobs[rule.Id] != nil {
job = s.jobs[rule.Id]
} else {
job = &AlertJob{
Running: false,
......@@ -32,67 +32,25 @@ func (scheduler *SchedulerImpl) Update(rules []*AlertRule) {
}
}
job.Rule = rule
job.Rule = &rule
job.Offset = int64(i)
jobs[rule.Id] = job
}
log.Debug("Scheduler: Selected %d jobs", len(jobs))
scheduler.jobs = jobs
s.jobs = jobs
}
func (scheduler *SchedulerImpl) Tick(tickTime time.Time, execQueue chan *AlertJob) {
func (s *SchedulerImpl) Tick(tickTime time.Time, execQueue chan *AlertJob) {
now := tickTime.Unix()
for _, job := range scheduler.jobs {
log.Info("Alerting: Scheduler.Tick() %v", len(s.jobs))
for _, job := range s.jobs {
if now%job.Rule.Frequency == 0 && job.Running == false {
log.Trace("Scheduler: Putting job on to exec queue: %s", job.Rule.Title)
execQueue <- job
}
}
}
// func (scheduler *Scheduler) handleResponses() {
// for response := range scheduler.responseQueue {
// log.Info("Response: alert(%d) status(%s) actual(%v) retry(%d)", response.Id, response.State, response.ActualValue, response.AlertJob.RetryCount)
// response.AlertJob.Running = false
//
// if response.IsResultIncomplete() {
// response.AlertJob.RetryCount++
// if response.AlertJob.RetryCount < maxRetries {
// scheduler.runQueue <- response.AlertJob
// } else {
// saveState(&AlertResult{
// Id: response.Id,
// State: alertstates.Critical,
// Description: fmt.Sprintf("Failed to run check after %d retires", maxRetries),
// })
// }
// } else {
// response.AlertJob.RetryCount = 0
// saveState(response)
// }
// }
// }
//
// func (scheduler *Scheduler) measureAndExecute(exec Executor, job *AlertJob) {
// now := time.Now()
//
// responseChan := make(chan *AlertResult, 1)
// go exec.Execute(job, responseChan)
//
// select {
// case <-time.After(time.Second * 5):
// scheduler.responseQueue <- &AlertResult{
// Id: job.Rule.Id,
// State: alertstates.Pending,
// Duration: float64(time.Since(now).Nanoseconds()) / float64(1000000),
// AlertJob: job,
// }
// case result := <-responseChan:
// result.Duration = float64(time.Since(now).Nanoseconds()) / float64(1000000)
// log.Info("Schedular: exeuction took %vms", result.Duration)
// scheduler.responseQueue <- result
// }
// }
......@@ -29,12 +29,12 @@ func GetAlertById(query *m.GetAlertByIdQuery) error {
return err
}
query.Result = alert
query.Result = &alert
return nil
}
func GetAllAlertQueryHandler(query *m.GetAllAlertsQuery) error {
var alerts []m.AlertRule
var alerts []*m.AlertRule
err := x.Sql("select * from alert_rule").Find(&alerts)
if err != nil {
return err
......@@ -87,7 +87,7 @@ func HandleAlertsQuery(query *m.GetAlertsQuery) error {
sql.WriteString(")")
}
alerts := make([]m.AlertRule, 0)
alerts := make([]*m.AlertRule, 0)
if err := x.Sql(sql.String(), params...).Find(&alerts); err != nil {
return err
}
......@@ -97,7 +97,7 @@ func HandleAlertsQuery(query *m.GetAlertsQuery) error {
}
func DeleteAlertDefinition(dashboardId int64, sess *xorm.Session) error {
alerts := make([]m.AlertRule, 0)
alerts := make([]*m.AlertRule, 0)
sess.Where("dashboard_id = ?", dashboardId).Find(&alerts)
for _, alert := range alerts {
......@@ -128,10 +128,10 @@ func SaveAlerts(cmd *m.SaveAlertsCommand) error {
})
}
func upsertAlerts(alerts []m.AlertRule, posted []m.AlertRule, sess *xorm.Session) error {
func upsertAlerts(alerts []*m.AlertRule, posted []*m.AlertRule, sess *xorm.Session) error {
for _, alert := range posted {
update := false
var alertToUpdate m.AlertRule
var alertToUpdate *m.AlertRule
for _, k := range alerts {
if alert.PanelId == k.PanelId {
......@@ -145,7 +145,7 @@ func upsertAlerts(alerts []m.AlertRule, posted []m.AlertRule, sess *xorm.Session
if alertToUpdate.Equals(alert) {
alert.Updated = time.Now()
alert.State = alertToUpdate.State
_, err := sess.Id(alert.Id).Update(&alert)
_, err := sess.Id(alert.Id).Update(alert)
if err != nil {
return err
}
......@@ -157,7 +157,7 @@ func upsertAlerts(alerts []m.AlertRule, posted []m.AlertRule, sess *xorm.Session
alert.Updated = time.Now()
alert.Created = time.Now()
alert.State = "OK"
_, err := sess.Insert(&alert)
_, err := sess.Insert(alert)
if err != nil {
return err
}
......@@ -168,7 +168,7 @@ func upsertAlerts(alerts []m.AlertRule, posted []m.AlertRule, sess *xorm.Session
return nil
}
func deleteMissingAlerts(alerts []m.AlertRule, posted []m.AlertRule, sess *xorm.Session) error {
func deleteMissingAlerts(alerts []*m.AlertRule, posted []*m.AlertRule, sess *xorm.Session) error {
for _, missingAlert := range alerts {
missing := true
......@@ -194,12 +194,12 @@ func deleteMissingAlerts(alerts []m.AlertRule, posted []m.AlertRule, sess *xorm.
return nil
}
func GetAlertsByDashboardId2(dashboardId int64, sess *xorm.Session) ([]m.AlertRule, error) {
alerts := make([]m.AlertRule, 0)
func GetAlertsByDashboardId2(dashboardId int64, sess *xorm.Session) ([]*m.AlertRule, error) {
alerts := make([]*m.AlertRule, 0)
err := sess.Where("dashboard_id = ?", dashboardId).Find(&alerts)
if err != nil {
return []m.AlertRule{}, err
return []*m.AlertRule{}, err
}
return alerts, nil
......
......@@ -2,10 +2,11 @@ package sqlstore
import (
"bytes"
"time"
"github.com/go-xorm/xorm"
"github.com/grafana/grafana/pkg/bus"
m "github.com/grafana/grafana/pkg/models"
"time"
)
func init() {
......@@ -38,7 +39,7 @@ func GetAlertRuleChanges(query *m.GetAlertChangesQuery) error {
params = append(params, query.Limit)
}
alertChanges := make([]m.AlertRuleChange, 0)
alertChanges := make([]*m.AlertRuleChange, 0)
if err := x.Sql(sql.String(), params...).Find(&alertChanges); err != nil {
return err
}
......@@ -47,7 +48,7 @@ func GetAlertRuleChanges(query *m.GetAlertChangesQuery) error {
return nil
}
func SaveAlertChange(change string, alert m.AlertRule, sess *xorm.Session) error {
func SaveAlertChange(change string, alert *m.AlertRule, sess *xorm.Session) error {
_, err := sess.Insert(&m.AlertRuleChange{
OrgId: alert.OrgId,
Type: change,
......
......@@ -20,8 +20,8 @@ func TestAlertRuleChangesDataAccess(t *testing.T) {
var err error
Convey("When dashboard is removed", func() {
items := []m.AlertRule{
{
items := []*m.AlertRule{
&m.AlertRule{
PanelId: 1,
DashboardId: testDash.Id,
Query: "Query",
......
......@@ -14,8 +14,8 @@ func TestAlertingDataAccess(t *testing.T) {
testDash := insertTestDashboard("dashboard with alerts", 1, "alert")
items := []m.AlertRule{
{
items := []*m.AlertRule{
&m.AlertRule{
PanelId: 1,
DashboardId: testDash.Id,
OrgId: testDash.OrgId,
......@@ -116,20 +116,20 @@ func TestAlertingDataAccess(t *testing.T) {
})
Convey("Multiple alerts per dashboard", func() {
multipleItems := []m.AlertRule{
{
multipleItems := []*m.AlertRule{
&m.AlertRule{
DashboardId: testDash.Id,
PanelId: 1,
Query: "1",
OrgId: 1,
},
{
&m.AlertRule{
DashboardId: testDash.Id,
PanelId: 2,
Query: "2",
OrgId: 1,
},
{
&m.AlertRule{
DashboardId: testDash.Id,
PanelId: 3,
Query: "3",
......@@ -178,8 +178,8 @@ func TestAlertingDataAccess(t *testing.T) {
})
Convey("When dashboard is removed", func() {
items := []m.AlertRule{
{
items := []*m.AlertRule{
&m.AlertRule{
PanelId: 1,
DashboardId: testDash.Id,
Query: "Query",
......
......@@ -13,8 +13,8 @@ func TestAlertingStateAccess(t *testing.T) {
testDash := insertTestDashboard("dashboard with alerts", 1, "alert")
items := []m.AlertRule{
{
items := []*m.AlertRule{
&m.AlertRule{
PanelId: 1,
DashboardId: testDash.Id,
OrgId: testDash.OrgId,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment