Commit a191b9b1 by Torkel Ödegaard

feat(alerting): moved alerting models back to alerting package, models is more for storage dtos

parent 70cb8400
......@@ -112,24 +112,3 @@ type GetAlertChangesQuery struct {
Result []AlertRuleChange
}
type AlertJob struct {
Offset int64
Delay bool
Running bool
RetryCount int
Rule AlertRule
}
type AlertResult struct {
Id int64
State string
ActualValue float64
Duration float64
Description string
AlertJob *AlertJob
}
func (ar *AlertResult) IsResultIncomplete() bool {
return ar.State == AlertStatePending
}
......@@ -2,6 +2,8 @@ package models
import (
"time"
"github.com/grafana/grafana/pkg/services/alerting/alertstates"
)
type AlertState struct {
......@@ -13,25 +15,8 @@ type AlertState struct {
Info string `json:"info"`
}
var (
VALID_STATES = []string{
AlertStateOk,
AlertStateWarn,
AlertStateCritical,
AlertStateAcknowledged,
AlertStateMaintenance,
}
AlertStateOk = "OK"
AlertStateWarn = "WARN"
AlertStateCritical = "CRITICAL"
AlertStateAcknowledged = "ACKNOWLEDGED"
AlertStateMaintenance = "MAINTENANCE"
AlertStatePending = "PENDING"
)
func (this *UpdateAlertStateCommand) IsValidState() bool {
for _, v := range VALID_STATES {
for _, v := range alertstates.ValidStates {
if this.NewState == v {
return true
}
......
package models
type TimeSeries struct {
Name string `json:"name"`
Points [][2]float64 `json:"points"`
}
type TimeSeriesSlice []*TimeSeries
func NewTimeSeries(name string, points [][2]float64) *TimeSeries {
return &TimeSeries{
Name: name,
Points: points,
}
}
......@@ -10,7 +10,7 @@ import (
)
type RuleReader interface {
Fetch() []m.AlertRule
Fetch() []AlertRule
}
type AlertRuleReader struct {
......@@ -28,15 +28,15 @@ func NewRuleReader() *AlertRuleReader {
}
var (
alertJobs []m.AlertRule
alertJobs []AlertRule
)
func (arr *AlertRuleReader) Fetch() []m.AlertRule {
func (arr *AlertRuleReader) Fetch() []AlertRule {
return alertJobs
}
func (arr *AlertRuleReader) initReader() {
alertJobs = make([]m.AlertRule, 0)
alertJobs = make([]AlertRule, 0)
heartbeat := time.NewTicker(time.Second * 10)
arr.updateRules()
......@@ -56,7 +56,7 @@ func (arr *AlertRuleReader) updateRules() {
err := bus.Dispatch(cmd)
if err == nil {
alertJobs = cmd.Result
//alertJobs = cmd.Result
} else {
log.Error(1, "AlertRuleReader: Could not load alerts")
}
......
package alerting
import (
"fmt"
"time"
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/log"
m "github.com/grafana/grafana/pkg/models"
......@@ -27,111 +24,9 @@ func Init() {
go scheduler.dispatch(reader)
go scheduler.executor(&ExecutorImpl{})
go scheduler.handleResponses()
}
type Scheduler struct {
jobs map[int64]*m.AlertJob
runQueue chan *m.AlertJob
responseQueue chan *m.AlertResult
alertRuleFetcher RuleReader
}
func NewScheduler() *Scheduler {
return &Scheduler{
jobs: make(map[int64]*m.AlertJob, 0),
runQueue: make(chan *m.AlertJob, 1000),
responseQueue: make(chan *m.AlertResult, 1000),
}
}
func (scheduler *Scheduler) dispatch(reader RuleReader) {
reschedule := time.NewTicker(time.Second * 10)
secondTicker := time.NewTicker(time.Second)
scheduler.updateJobs(reader.Fetch)
for {
select {
case <-secondTicker.C:
scheduler.queueJobs()
case <-reschedule.C:
scheduler.updateJobs(reader.Fetch)
}
}
}
func (scheduler *Scheduler) updateJobs(alertRuleFn func() []m.AlertRule) {
log.Debug("Scheduler: UpdateJobs()")
jobs := make(map[int64]*m.AlertJob, 0)
rules := alertRuleFn()
for i, rule := range rules {
var job *m.AlertJob
if scheduler.jobs[rule.Id] != nil {
job = scheduler.jobs[rule.Id]
} else {
job = &m.AlertJob{
Running: false,
RetryCount: 0,
}
}
job.Rule = rule
job.Offset = int64(i)
jobs[rule.Id] = job
}
log.Debug("Scheduler: Selected %d jobs", len(jobs))
scheduler.jobs = jobs
}
func (scheduler *Scheduler) queueJobs() {
now := time.Now().Unix()
for _, job := range scheduler.jobs {
if now%job.Rule.Frequency == 0 && job.Running == false {
log.Info("Scheduler: Putting job on to run queue: %s", job.Rule.Title)
scheduler.runQueue <- job
}
}
}
func (scheduler *Scheduler) executor(executor Executor) {
for job := range scheduler.runQueue {
//log.Info("Executor: queue length %d", len(this.runQueue))
log.Info("Executor: executing %s", job.Rule.Title)
job.Running = true
scheduler.measureAndExecute(executor, job)
}
}
func (scheduler *Scheduler) handleResponses() {
for response := range scheduler.responseQueue {
log.Info("Response: alert(%d) status(%s) actual(%v) retry(%d)", response.Id, response.State, response.ActualValue, response.AlertJob.RetryCount)
response.AlertJob.Running = false
if response.IsResultIncomplete() {
response.AlertJob.RetryCount++
if response.AlertJob.RetryCount < maxRetries {
scheduler.runQueue <- response.AlertJob
} else {
saveState(&m.AlertResult{
Id: response.Id,
State: m.AlertStateCritical,
Description: fmt.Sprintf("Failed to run check after %d retires", maxRetries),
})
}
} else {
response.AlertJob.RetryCount = 0
saveState(response)
}
}
}
func saveState(response *m.AlertResult) {
func saveState(response *AlertResult) {
cmd := &m.UpdateAlertStateCommand{
AlertId: response.Id,
NewState: response.State,
......@@ -142,24 +37,3 @@ func saveState(response *m.AlertResult) {
log.Error(2, "failed to save state %v", err)
}
}
func (scheduler *Scheduler) measureAndExecute(exec Executor, job *m.AlertJob) {
now := time.Now()
responseChan := make(chan *m.AlertResult, 1)
go exec.Execute(job, responseChan)
select {
case <-time.After(time.Second * 5):
scheduler.responseQueue <- &m.AlertResult{
Id: job.Rule.Id,
State: m.AlertStatePending,
Duration: float64(time.Since(now).Nanoseconds()) / float64(1000000),
AlertJob: job,
}
case result := <-responseChan:
result.Duration = float64(time.Since(now).Nanoseconds()) / float64(1000000)
log.Info("Schedular: exeuction took %vms", result.Duration)
scheduler.responseQueue <- result
}
}
package alertstates
var (
ValidStates = []string{
Ok,
Warn,
Critical,
Acknowledged,
Maintenance,
}
Ok = "OK"
Warn = "WARN"
Critical = "CRITICAL"
Acknowledged = "ACKNOWLEDGED"
Maintenance = "MAINTENANCE"
Pending = "PENDING"
)
package graphite
import (
"fmt"
"github.com/grafana/grafana/pkg/bus"
m "github.com/grafana/grafana/pkg/models"
)
// AlertDatasource is bacon
type AlertDatasource interface {
GetSeries(job *m.AlertJob, datasource m.DataSource) (m.TimeSeriesSlice, error)
}
package datasources
// GetSeries returns timeseries data from the datasource
func GetSeries(job *m.AlertJob) (m.TimeSeriesSlice, error) {
query := &m.GetDataSourceByIdQuery{
Id: job.Rule.DatasourceId,
OrgId: job.Rule.OrgId,
}
err := bus.Dispatch(query)
if err != nil {
return nil, fmt.Errorf("Could not find datasource for %d", job.Rule.DatasourceId)
}
if query.Result.Type == m.DS_GRAPHITE {
return GraphiteClient{}.GetSeries(*job, query.Result)
}
return nil, fmt.Errorf("Grafana does not support alerts for %s", query.Result.Type)
}
package graphite
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"time"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/log"
m "github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/util"
)
type GraphiteClient struct{}
type GraphiteSerie struct {
Datapoints [][2]float64
Target string
}
var DefaultClient = &http.Client{
Timeout: time.Minute,
}
type GraphiteResponse []GraphiteSerie
func (client GraphiteClient) GetSeries(rule m.AlertJob, datasource m.DataSource) (m.TimeSeriesSlice, error) {
v := url.Values{
"format": []string{"json"},
"target": []string{getTargetFromRule(rule.Rule)},
"until": []string{"now"},
"from": []string{"-" + strconv.Itoa(rule.Rule.QueryRange) + "s"},
}
log.Trace("Graphite: sending request with querystring: ", v.Encode())
req, err := http.NewRequest("POST", datasource.Url+"/render", nil)
if err != nil {
return nil, fmt.Errorf("Could not create request")
}
req.Body = ioutil.NopCloser(bytes.NewReader([]byte(v.Encode())))
if datasource.BasicAuth {
req.Header.Add("Authorization", util.GetBasicAuthHeader(datasource.User, datasource.Password))
}
res, err := DefaultClient.Do(req)
if err != nil {
return nil, err
}
if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("expected httpstatus 200, found %d", res.StatusCode)
}
response := GraphiteResponse{}
json.NewDecoder(res.Body).Decode(&response)
var timeSeries []*m.TimeSeries
for _, v := range response {
timeSeries = append(timeSeries, m.NewTimeSeries(v.Target, v.Datapoints))
}
return timeSeries, nil
}
func getTargetFromRule(rule m.AlertRule) string {
json, _ := simplejson.NewJson([]byte(rule.Query))
return json.Get("target").MustString()
}
package datasources
// import (
// "bytes"
// "encoding/json"
// "fmt"
// "io/ioutil"
// "net/http"
// "net/url"
// "strconv"
// "time"
//
// "github.com/grafana/grafana/pkg/components/simplejson"
// "github.com/grafana/grafana/pkg/log"
// m "github.com/grafana/grafana/pkg/models"
// "github.com/grafana/grafana/pkg/util"
// )
//
// type GraphiteClient struct{}
//
// type GraphiteSerie struct {
// Datapoints [][2]float64
// Target string
// }
//
// var DefaultClient = &http.Client{
// Timeout: time.Minute,
// }
//
// type GraphiteResponse []GraphiteSerie
//
// func (client GraphiteClient) GetSeries(rule m.AlertJob, datasource m.DataSource) (m.TimeSeriesSlice, error) {
// v := url.Values{
// "format": []string{"json"},
// "target": []string{getTargetFromRule(rule.Rule)},
// "until": []string{"now"},
// "from": []string{"-" + strconv.Itoa(rule.Rule.QueryRange) + "s"},
// }
//
// log.Trace("Graphite: sending request with querystring: ", v.Encode())
//
// req, err := http.NewRequest("POST", datasource.Url+"/render", nil)
//
// if err != nil {
// return nil, fmt.Errorf("Could not create request")
// }
//
// req.Body = ioutil.NopCloser(bytes.NewReader([]byte(v.Encode())))
//
// if datasource.BasicAuth {
// req.Header.Add("Authorization", util.GetBasicAuthHeader(datasource.User, datasource.Password))
// }
//
// res, err := DefaultClient.Do(req)
//
// if err != nil {
// return nil, err
// }
//
// if res.StatusCode != http.StatusOK {
// return nil, fmt.Errorf("expected httpstatus 200, found %d", res.StatusCode)
// }
//
// response := GraphiteResponse{}
//
// json.NewDecoder(res.Body).Decode(&response)
//
// var timeSeries []*m.TimeSeries
// for _, v := range response {
// timeSeries = append(timeSeries, m.NewTimeSeries(v.Target, v.Datapoints))
// }
//
// return timeSeries, nil
// }
//
// func getTargetFromRule(rule m.AlertRule) string {
// json, _ := simplejson.NewJson([]byte(rule.Query))
//
// return json.Get("target").MustString()
// }
......@@ -5,13 +5,15 @@ import (
"math"
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/log"
m "github.com/grafana/grafana/pkg/models"
b "github.com/grafana/grafana/pkg/services/alerting/datasources"
"github.com/grafana/grafana/pkg/services/alerting/alertstates"
"github.com/grafana/grafana/pkg/tsdb"
)
type Executor interface {
Execute(rule *m.AlertJob, responseQueue chan *m.AlertResult)
Execute(rule *AlertJob, responseQueue chan *AlertResult)
}
var (
......@@ -22,7 +24,7 @@ var (
type ExecutorImpl struct{}
type compareFn func(float64, float64) bool
type aggregationFn func(*m.TimeSeries) float64
type aggregationFn func(*tsdb.TimeSeries) float64
var operators = map[string]compareFn{
">": func(num1, num2 float64) bool { return num1 > num2 },
......@@ -32,7 +34,7 @@ var operators = map[string]compareFn{
"": func(num1, num2 float64) bool { return false },
}
var aggregator = map[string]aggregationFn{
"avg": func(series *m.TimeSeries) float64 {
"avg": func(series *tsdb.TimeSeries) float64 {
sum := float64(0)
for _, v := range series.Points {
......@@ -41,7 +43,7 @@ var aggregator = map[string]aggregationFn{
return sum / float64(len(series.Points))
},
"sum": func(series *m.TimeSeries) float64 {
"sum": func(series *tsdb.TimeSeries) float64 {
sum := float64(0)
for _, v := range series.Points {
......@@ -50,7 +52,7 @@ var aggregator = map[string]aggregationFn{
return sum
},
"min": func(series *m.TimeSeries) float64 {
"min": func(series *tsdb.TimeSeries) float64 {
min := series.Points[0][0]
for _, v := range series.Points {
......@@ -61,7 +63,7 @@ var aggregator = map[string]aggregationFn{
return min
},
"max": func(series *m.TimeSeries) float64 {
"max": func(series *tsdb.TimeSeries) float64 {
max := series.Points[0][0]
for _, v := range series.Points {
......@@ -72,17 +74,17 @@ var aggregator = map[string]aggregationFn{
return max
},
"mean": func(series *m.TimeSeries) float64 {
"mean": func(series *tsdb.TimeSeries) float64 {
midPosition := int64(math.Floor(float64(len(series.Points)) / float64(2)))
return series.Points[midPosition][0]
},
}
func (executor *ExecutorImpl) Execute(job *m.AlertJob, responseQueue chan *m.AlertResult) {
response, err := b.GetSeries(job)
func (executor *ExecutorImpl) Execute(job *AlertJob, responseQueue chan *AlertResult) {
response, err := executor.GetSeries(job)
if err != nil {
responseQueue <- &m.AlertResult{State: m.AlertStatePending, Id: job.Rule.Id, AlertJob: job}
responseQueue <- &AlertResult{State: alertstates.Pending, Id: job.Rule.Id, AlertJob: job}
}
result := executor.validateRule(job.Rule, response)
......@@ -90,7 +92,26 @@ func (executor *ExecutorImpl) Execute(job *m.AlertJob, responseQueue chan *m.Ale
responseQueue <- result
}
func (executor *ExecutorImpl) validateRule(rule m.AlertRule, series m.TimeSeriesSlice) *m.AlertResult {
func (executor *ExecutorImpl) GetSeries(job *AlertJob) (tsdb.TimeSeriesSlice, error) {
query := &m.GetDataSourceByIdQuery{
Id: job.Rule.DatasourceId,
OrgId: job.Rule.OrgId,
}
err := bus.Dispatch(query)
if err != nil {
return nil, fmt.Errorf("Could not find datasource for %d", job.Rule.DatasourceId)
}
// if query.Result.Type == m.DS_GRAPHITE {
// return GraphiteClient{}.GetSeries(*job, query.Result)
// }
return nil, fmt.Errorf("Grafana does not support alerts for %s", query.Result.Type)
}
func (executor *ExecutorImpl) validateRule(rule AlertRule, series tsdb.TimeSeriesSlice) *AlertResult {
for _, serie := range series {
if aggregator[rule.Aggregator] == nil {
continue
......@@ -102,8 +123,8 @@ func (executor *ExecutorImpl) validateRule(rule m.AlertRule, series m.TimeSeries
log.Trace(resultLogFmt, "Crit", serie.Name, aggValue, rule.CritOperator, rule.CritLevel, critResult)
if critResult {
return &m.AlertResult{
State: m.AlertStateCritical,
return &AlertResult{
State: alertstates.Critical,
Id: rule.Id,
ActualValue: aggValue,
Description: fmt.Sprintf(descriptionFmt, aggValue, serie.Name),
......@@ -114,8 +135,8 @@ func (executor *ExecutorImpl) validateRule(rule m.AlertRule, series m.TimeSeries
var warnResult = warnOperartor(aggValue, rule.CritLevel)
log.Trace(resultLogFmt, "Warn", serie.Name, aggValue, rule.WarnOperator, rule.WarnLevel, warnResult)
if warnResult {
return &m.AlertResult{
State: m.AlertStateWarn,
return &AlertResult{
State: alertstates.Warn,
Id: rule.Id,
Description: fmt.Sprintf(descriptionFmt, aggValue, serie.Name),
ActualValue: aggValue,
......@@ -123,5 +144,5 @@ func (executor *ExecutorImpl) validateRule(rule m.AlertRule, series m.TimeSeries
}
}
return &m.AlertResult{State: m.AlertStateOk, Id: rule.Id, Description: "Alert is OK!"}
return &AlertResult{State: alertstates.Ok, Id: rule.Id, Description: "Alert is OK!"}
}
package alerting
import "github.com/grafana/grafana/pkg/services/alerting/alertstates"
type AlertJob struct {
Offset int64
Delay bool
Running bool
RetryCount int
Rule AlertRule
}
type AlertResult struct {
Id int64
State string
ActualValue float64
Duration float64
Description string
AlertJob *AlertJob
}
func (ar *AlertResult) IsResultIncomplete() bool {
return ar.State == alertstates.Pending
}
type AlertRule struct {
Id int64
OrgId int64
DatasourceId int64
DashboardId int64
PanelId int64
Query string
QueryRefId string
WarnLevel float64
CritLevel float64
WarnOperator string
CritOperator string
Frequency int64
Title string
Description string
QueryRange int
Aggregator string
}
package alerting
import (
"fmt"
"time"
"github.com/Unknwon/log"
"github.com/grafana/grafana/pkg/services/alerting/alertstates"
)
type Scheduler struct {
jobs map[int64]*AlertJob
runQueue chan *AlertJob
responseQueue chan *AlertResult
}
func NewScheduler() *Scheduler {
return &Scheduler{
jobs: make(map[int64]*AlertJob, 0),
runQueue: make(chan *AlertJob, 1000),
responseQueue: make(chan *AlertResult, 1000),
}
}
func (scheduler *Scheduler) dispatch(reader RuleReader) {
reschedule := time.NewTicker(time.Second * 10)
secondTicker := time.NewTicker(time.Second)
scheduler.updateJobs(reader.Fetch)
for {
select {
case <-secondTicker.C:
scheduler.queueJobs()
case <-reschedule.C:
scheduler.updateJobs(reader.Fetch)
}
}
}
func (scheduler *Scheduler) updateJobs(alertRuleFn func() []AlertRule) {
log.Debug("Scheduler: UpdateJobs()")
jobs := make(map[int64]*AlertJob, 0)
rules := alertRuleFn()
for i, rule := range rules {
var job *AlertJob
if scheduler.jobs[rule.Id] != nil {
job = scheduler.jobs[rule.Id]
} else {
job = &AlertJob{
Running: false,
RetryCount: 0,
}
}
job.Rule = rule
job.Offset = int64(i)
jobs[rule.Id] = job
}
log.Debug("Scheduler: Selected %d jobs", len(jobs))
scheduler.jobs = jobs
}
func (scheduler *Scheduler) queueJobs() {
now := time.Now().Unix()
for _, job := range scheduler.jobs {
if now%job.Rule.Frequency == 0 && job.Running == false {
log.Info("Scheduler: Putting job on to run queue: %s", job.Rule.Title)
scheduler.runQueue <- job
}
}
}
func (scheduler *Scheduler) executor(executor Executor) {
for job := range scheduler.runQueue {
//log.Info("Executor: queue length %d", len(this.runQueue))
log.Info("Executor: executing %s", job.Rule.Title)
job.Running = true
scheduler.measureAndExecute(executor, job)
}
}
func (scheduler *Scheduler) handleResponses() {
for response := range scheduler.responseQueue {
log.Info("Response: alert(%d) status(%s) actual(%v) retry(%d)", response.Id, response.State, response.ActualValue, response.AlertJob.RetryCount)
response.AlertJob.Running = false
if response.IsResultIncomplete() {
response.AlertJob.RetryCount++
if response.AlertJob.RetryCount < maxRetries {
scheduler.runQueue <- response.AlertJob
} else {
saveState(&AlertResult{
Id: response.Id,
State: alertstates.Critical,
Description: fmt.Sprintf("Failed to run check after %d retires", maxRetries),
})
}
} else {
response.AlertJob.RetryCount = 0
saveState(response)
}
}
}
func (scheduler *Scheduler) measureAndExecute(exec Executor, job *AlertJob) {
now := time.Now()
responseChan := make(chan *AlertResult, 1)
go exec.Execute(job, responseChan)
select {
case <-time.After(time.Second * 5):
scheduler.responseQueue <- &AlertResult{
Id: job.Rule.Id,
State: alertstates.Pending,
Duration: float64(time.Since(now).Nanoseconds()) / float64(1000000),
AlertJob: job,
}
case result := <-responseChan:
result.Duration = float64(time.Since(now).Nanoseconds()) / float64(1000000)
log.Info("Schedular: exeuction took %vms", result.Duration)
scheduler.responseQueue <- result
}
}
package tsdb
import "errors"
type Batch struct {
DataSourceId int64
Queries QuerySlice
Depends map[string]bool
Done bool
Started bool
}
type BatchSlice []*Batch
func newBatch(dsId int64, queries QuerySlice) *Batch {
return &Batch{
DataSourceId: dsId,
Queries: queries,
Depends: make(map[string]bool),
}
}
func (bg *Batch) process(context *QueryContext) {
executor := getExecutorFor(bg.Queries[0].DataSource)
if executor == nil {
bg.Done = true
result := &BatchResult{
Error: errors.New("Could not find executor for data source type " + bg.Queries[0].DataSource.Type),
QueryResults: make(map[string]*QueryResult),
}
for _, query := range bg.Queries {
result.QueryResults[query.RefId] = &QueryResult{Error: result.Error}
}
context.ResultsChan <- result
return
}
res := executor.Execute(bg.Queries, context)
bg.Done = true
context.ResultsChan <- res
}
func (bg *Batch) addQuery(query *Query) {
bg.Queries = append(bg.Queries, query)
}
func (bg *Batch) allDependenciesAreIn(context *QueryContext) bool {
for key := range bg.Depends {
if _, exists := context.Results[key]; !exists {
return false
}
}
return true
}
func getBatches(req *Request) (BatchSlice, error) {
batches := make(BatchSlice, 0)
for _, query := range req.Queries {
if foundBatch := findMatchingBatchGroup(query, batches); foundBatch != nil {
foundBatch.addQuery(query)
} else {
newBatch := newBatch(query.DataSource.Id, QuerySlice{query})
batches = append(batches, newBatch)
for _, refId := range query.Depends {
for _, batch := range batches {
for _, batchQuery := range batch.Queries {
if batchQuery.RefId == refId {
newBatch.Depends[refId] = true
}
}
}
}
}
}
return batches, nil
}
func findMatchingBatchGroup(query *Query, batches BatchSlice) *Batch {
for _, batch := range batches {
if batch.DataSourceId == query.DataSource.Id {
return batch
}
}
return nil
}
package tsdb
type Executor interface {
Execute(queries QuerySlice, context *QueryContext) *BatchResult
}
var registry map[string]GetExecutorFn
type GetExecutorFn func(dsInfo *DataSourceInfo) Executor
func init() {
registry = make(map[string]GetExecutorFn)
}
func getExecutorFor(dsInfo *DataSourceInfo) Executor {
if fn, exists := registry[dsInfo.Type]; exists {
return fn(dsInfo)
}
return nil
}
func RegisterExecutor(dsType string, fn GetExecutorFn) {
registry[dsType] = fn
}
package tsdb
import "time"
type TimeRange struct {
From time.Time
To time.Time
}
type Request struct {
TimeRange TimeRange
MaxDataPoints int
Queries QuerySlice
}
type Response struct {
BatchTimings []*BatchTiming
Results map[string]*QueryResult
}
type DataSourceInfo struct {
Id int64
Name string
Type string
Url string
Password string
User string
Database string
BasicAuth bool
BasicAuthUser string
BasicAuthPassword string
}
type BatchTiming struct {
TimeElapsed int64
}
type BatchResult struct {
Error error
QueryResults map[string]*QueryResult
Timings *BatchTiming
}
type QueryResult struct {
Error error
RefId string
Series TimeSeriesSlice
}
type TimeSeries struct {
Name string
Points [][2]float64
}
type TimeSeriesSlice []*TimeSeries
func NewTimeSeries(name string, points [][2]float64) *TimeSeries {
return &TimeSeries{
Name: name,
Points: points,
}
}
package tsdb
type Query struct {
RefId string
Query string
Depends []string
DataSource *DataSourceInfo
Results []*TimeSeries
Exclude bool
}
type QuerySlice []*Query
package tsdb
import "sync"
type QueryContext struct {
TimeRange TimeRange
Queries QuerySlice
Results map[string]*QueryResult
ResultsChan chan *BatchResult
Lock sync.RWMutex
BatchWaits sync.WaitGroup
}
func NewQueryContext(queries QuerySlice, timeRange TimeRange) *QueryContext {
return &QueryContext{
TimeRange: timeRange,
Queries: queries,
ResultsChan: make(chan *BatchResult),
Results: make(map[string]*QueryResult),
}
}
package tsdb
func HandleRequest(req *Request) (*Response, error) {
context := NewQueryContext(req.Queries, req.TimeRange)
batches, err := getBatches(req)
if err != nil {
return nil, err
}
currentlyExecuting := 0
for _, batch := range batches {
if len(batch.Depends) == 0 {
currentlyExecuting += 1
batch.Started = true
go batch.process(context)
}
}
response := &Response{}
for currentlyExecuting != 0 {
select {
case batchResult := <-context.ResultsChan:
currentlyExecuting -= 1
response.BatchTimings = append(response.BatchTimings, batchResult.Timings)
for refId, result := range batchResult.QueryResults {
context.Results[refId] = result
}
for _, batch := range batches {
// not interested in started batches
if batch.Started {
continue
}
if batch.allDependenciesAreIn(context) {
currentlyExecuting += 1
batch.Started = true
go batch.process(context)
}
}
}
}
response.Results = context.Results
return response, nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment