Commit 8d585766 by Torkel Ödegaard

refactor(tsdb): changed tsdb time series model to use null.Float instead of pointers

parent 63caedb0
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/services/alerting" "github.com/grafana/grafana/pkg/services/alerting"
"gopkg.in/guregu/null.v3"
) )
var ( var (
...@@ -13,13 +14,13 @@ var ( ...@@ -13,13 +14,13 @@ var (
) )
type AlertEvaluator interface { type AlertEvaluator interface {
Eval(reducedValue *float64) bool Eval(reducedValue null.Float) bool
} }
type NoDataEvaluator struct{} type NoDataEvaluator struct{}
func (e *NoDataEvaluator) Eval(reducedValue *float64) bool { func (e *NoDataEvaluator) Eval(reducedValue null.Float) bool {
return reducedValue == nil return reducedValue.Valid == false
} }
type ThresholdEvaluator struct { type ThresholdEvaluator struct {
...@@ -43,16 +44,16 @@ func newThresholdEvaludator(typ string, model *simplejson.Json) (*ThresholdEvalu ...@@ -43,16 +44,16 @@ func newThresholdEvaludator(typ string, model *simplejson.Json) (*ThresholdEvalu
return defaultEval, nil return defaultEval, nil
} }
func (e *ThresholdEvaluator) Eval(reducedValue *float64) bool { func (e *ThresholdEvaluator) Eval(reducedValue null.Float) bool {
if reducedValue == nil { if reducedValue.Valid == false {
return false return false
} }
switch e.Type { switch e.Type {
case "gt": case "gt":
return *reducedValue > e.Threshold return reducedValue.Float64 > e.Threshold
case "lt": case "lt":
return *reducedValue < e.Threshold return reducedValue.Float64 < e.Threshold
} }
return false return false
...@@ -86,16 +87,18 @@ func newRangedEvaluator(typ string, model *simplejson.Json) (*RangedEvaluator, e ...@@ -86,16 +87,18 @@ func newRangedEvaluator(typ string, model *simplejson.Json) (*RangedEvaluator, e
return rangedEval, nil return rangedEval, nil
} }
func (e *RangedEvaluator) Eval(reducedValue *float64) bool { func (e *RangedEvaluator) Eval(reducedValue null.Float) bool {
if reducedValue == nil { if reducedValue.Valid == false {
return false return false
} }
floatValue := reducedValue.Float64
switch e.Type { switch e.Type {
case "within_range": case "within_range":
return (e.Lower < *reducedValue && e.Upper > *reducedValue) || (e.Upper < *reducedValue && e.Lower > *reducedValue) return (e.Lower < floatValue && e.Upper > floatValue) || (e.Upper < floatValue && e.Lower > floatValue)
case "outside_range": case "outside_range":
return (e.Upper < *reducedValue && e.Lower < *reducedValue) || (e.Upper > *reducedValue && e.Lower > *reducedValue) return (e.Upper < floatValue && e.Lower < floatValue) || (e.Upper > floatValue && e.Lower > floatValue)
} }
return false return false
......
...@@ -3,6 +3,8 @@ package conditions ...@@ -3,6 +3,8 @@ package conditions
import ( import (
"testing" "testing"
"gopkg.in/guregu/null.v3"
"github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/components/simplejson"
. "github.com/smartystreets/goconvey/convey" . "github.com/smartystreets/goconvey/convey"
) )
...@@ -14,7 +16,7 @@ func evalutorScenario(json string, reducedValue float64, datapoints ...float64) ...@@ -14,7 +16,7 @@ func evalutorScenario(json string, reducedValue float64, datapoints ...float64)
evaluator, err := NewAlertEvaluator(jsonModel) evaluator, err := NewAlertEvaluator(jsonModel)
So(err, ShouldBeNil) So(err, ShouldBeNil)
return evaluator.Eval(&reducedValue) return evaluator.Eval(null.FloatFrom(reducedValue))
} }
func TestEvalutors(t *testing.T) { func TestEvalutors(t *testing.T) {
...@@ -51,6 +53,6 @@ func TestEvalutors(t *testing.T) { ...@@ -51,6 +53,6 @@ func TestEvalutors(t *testing.T) {
evaluator, err := NewAlertEvaluator(jsonModel) evaluator, err := NewAlertEvaluator(jsonModel)
So(err, ShouldBeNil) So(err, ShouldBeNil)
So(evaluator.Eval(nil), ShouldBeTrue) So(evaluator.Eval(null.FloatFromPtr(nil)), ShouldBeTrue)
}) })
} }
...@@ -46,21 +46,21 @@ func (c *QueryCondition) Eval(context *alerting.EvalContext) { ...@@ -46,21 +46,21 @@ func (c *QueryCondition) Eval(context *alerting.EvalContext) {
reducedValue := c.Reducer.Reduce(series) reducedValue := c.Reducer.Reduce(series)
evalMatch := c.Evaluator.Eval(reducedValue) evalMatch := c.Evaluator.Eval(reducedValue)
if reducedValue == nil { if reducedValue.Valid == false {
emptySerieCount++ emptySerieCount++
continue continue
} }
if context.IsTestRun { if context.IsTestRun {
context.Logs = append(context.Logs, &alerting.ResultLogEntry{ context.Logs = append(context.Logs, &alerting.ResultLogEntry{
Message: fmt.Sprintf("Condition[%d]: Eval: %v, Metric: %s, Value: %1.3f", c.Index, evalMatch, series.Name, *reducedValue), Message: fmt.Sprintf("Condition[%d]: Eval: %v, Metric: %s, Value: %1.3f", c.Index, evalMatch, series.Name, reducedValue.Float64),
}) })
} }
if evalMatch { if evalMatch {
context.EvalMatches = append(context.EvalMatches, &alerting.EvalMatch{ context.EvalMatches = append(context.EvalMatches, &alerting.EvalMatch{
Metric: series.Name, Metric: series.Name,
Value: *reducedValue, Value: reducedValue.Float64,
}) })
} }
} }
......
...@@ -3,6 +3,8 @@ package conditions ...@@ -3,6 +3,8 @@ package conditions
import ( import (
"testing" "testing"
null "gopkg.in/guregu/null.v3"
"github.com/grafana/grafana/pkg/bus" "github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/components/simplejson"
m "github.com/grafana/grafana/pkg/models" m "github.com/grafana/grafana/pkg/models"
...@@ -41,9 +43,8 @@ func TestQueryCondition(t *testing.T) { ...@@ -41,9 +43,8 @@ func TestQueryCondition(t *testing.T) {
}) })
Convey("should fire when avg is above 100", func() { Convey("should fire when avg is above 100", func() {
one := float64(120) points := tsdb.NewTimeSeriesPointsFromArgs(120, 0)
two := float64(0) ctx.series = tsdb.TimeSeriesSlice{tsdb.NewTimeSeries("test1", points)}
ctx.series = tsdb.TimeSeriesSlice{tsdb.NewTimeSeries("test1", [][2]*float64{{&one, &two}})}
ctx.exec() ctx.exec()
So(ctx.result.Error, ShouldBeNil) So(ctx.result.Error, ShouldBeNil)
...@@ -51,9 +52,8 @@ func TestQueryCondition(t *testing.T) { ...@@ -51,9 +52,8 @@ func TestQueryCondition(t *testing.T) {
}) })
Convey("Should not fire when avg is below 100", func() { Convey("Should not fire when avg is below 100", func() {
one := float64(90) points := tsdb.NewTimeSeriesPointsFromArgs(90, 0)
two := float64(0) ctx.series = tsdb.TimeSeriesSlice{tsdb.NewTimeSeries("test1", points)}
ctx.series = tsdb.TimeSeriesSlice{tsdb.NewTimeSeries("test1", [][2]*float64{{&one, &two}})}
ctx.exec() ctx.exec()
So(ctx.result.Error, ShouldBeNil) So(ctx.result.Error, ShouldBeNil)
...@@ -61,11 +61,9 @@ func TestQueryCondition(t *testing.T) { ...@@ -61,11 +61,9 @@ func TestQueryCondition(t *testing.T) {
}) })
Convey("Should fire if only first serie matches", func() { Convey("Should fire if only first serie matches", func() {
one := float64(120)
two := float64(0)
ctx.series = tsdb.TimeSeriesSlice{ ctx.series = tsdb.TimeSeriesSlice{
tsdb.NewTimeSeries("test1", [][2]*float64{{&one, &two}}), tsdb.NewTimeSeries("test1", tsdb.NewTimeSeriesPointsFromArgs(120, 0)),
tsdb.NewTimeSeries("test2", [][2]*float64{{&two, &two}}), tsdb.NewTimeSeries("test2", tsdb.NewTimeSeriesPointsFromArgs(0, 0)),
} }
ctx.exec() ctx.exec()
...@@ -76,8 +74,8 @@ func TestQueryCondition(t *testing.T) { ...@@ -76,8 +74,8 @@ func TestQueryCondition(t *testing.T) {
Convey("Empty series", func() { Convey("Empty series", func() {
Convey("Should set NoDataFound both series are empty", func() { Convey("Should set NoDataFound both series are empty", func() {
ctx.series = tsdb.TimeSeriesSlice{ ctx.series = tsdb.TimeSeriesSlice{
tsdb.NewTimeSeries("test1", [][2]*float64{}), tsdb.NewTimeSeries("test1", tsdb.NewTimeSeriesPointsFromArgs()),
tsdb.NewTimeSeries("test2", [][2]*float64{}), tsdb.NewTimeSeries("test2", tsdb.NewTimeSeriesPointsFromArgs()),
} }
ctx.exec() ctx.exec()
...@@ -86,10 +84,9 @@ func TestQueryCondition(t *testing.T) { ...@@ -86,10 +84,9 @@ func TestQueryCondition(t *testing.T) {
}) })
Convey("Should set NoDataFound both series contains null", func() { Convey("Should set NoDataFound both series contains null", func() {
one := float64(120)
ctx.series = tsdb.TimeSeriesSlice{ ctx.series = tsdb.TimeSeriesSlice{
tsdb.NewTimeSeries("test1", [][2]*float64{{nil, &one}}), tsdb.NewTimeSeries("test1", tsdb.TimeSeriesPoints{tsdb.TimePoint{null.FloatFromPtr(nil), null.FloatFrom(0)}}),
tsdb.NewTimeSeries("test2", [][2]*float64{{nil, &one}}), tsdb.NewTimeSeries("test2", tsdb.TimeSeriesPoints{tsdb.TimePoint{null.FloatFromPtr(nil), null.FloatFrom(0)}}),
} }
ctx.exec() ctx.exec()
...@@ -98,11 +95,9 @@ func TestQueryCondition(t *testing.T) { ...@@ -98,11 +95,9 @@ func TestQueryCondition(t *testing.T) {
}) })
Convey("Should not set NoDataFound if one serie is empty", func() { Convey("Should not set NoDataFound if one serie is empty", func() {
one := float64(120)
two := float64(0)
ctx.series = tsdb.TimeSeriesSlice{ ctx.series = tsdb.TimeSeriesSlice{
tsdb.NewTimeSeries("test1", [][2]*float64{}), tsdb.NewTimeSeries("test1", tsdb.NewTimeSeriesPointsFromArgs()),
tsdb.NewTimeSeries("test2", [][2]*float64{{&one, &two}}), tsdb.NewTimeSeries("test2", tsdb.NewTimeSeriesPointsFromArgs(120, 0)),
} }
ctx.exec() ctx.exec()
......
...@@ -4,19 +4,20 @@ import ( ...@@ -4,19 +4,20 @@ import (
"math" "math"
"github.com/grafana/grafana/pkg/tsdb" "github.com/grafana/grafana/pkg/tsdb"
"gopkg.in/guregu/null.v3"
) )
type QueryReducer interface { type QueryReducer interface {
Reduce(timeSeries *tsdb.TimeSeries) *float64 Reduce(timeSeries *tsdb.TimeSeries) null.Float
} }
type SimpleReducer struct { type SimpleReducer struct {
Type string Type string
} }
func (s *SimpleReducer) Reduce(series *tsdb.TimeSeries) *float64 { func (s *SimpleReducer) Reduce(series *tsdb.TimeSeries) null.Float {
if len(series.Points) == 0 { if len(series.Points) == 0 {
return nil return null.FloatFromPtr(nil)
} }
value := float64(0) value := float64(0)
...@@ -25,36 +26,36 @@ func (s *SimpleReducer) Reduce(series *tsdb.TimeSeries) *float64 { ...@@ -25,36 +26,36 @@ func (s *SimpleReducer) Reduce(series *tsdb.TimeSeries) *float64 {
switch s.Type { switch s.Type {
case "avg": case "avg":
for _, point := range series.Points { for _, point := range series.Points {
if point[0] != nil { if point[0].Valid {
value += *point[0] value += point[0].Float64
allNull = false allNull = false
} }
} }
value = value / float64(len(series.Points)) value = value / float64(len(series.Points))
case "sum": case "sum":
for _, point := range series.Points { for _, point := range series.Points {
if point[0] != nil { if point[0].Valid {
value += *point[0] value += point[0].Float64
allNull = false allNull = false
} }
} }
case "min": case "min":
value = math.MaxFloat64 value = math.MaxFloat64
for _, point := range series.Points { for _, point := range series.Points {
if point[0] != nil { if point[0].Valid {
allNull = false allNull = false
if value > *point[0] { if value > point[0].Float64 {
value = *point[0] value = point[0].Float64
} }
} }
} }
case "max": case "max":
value = -math.MaxFloat64 value = -math.MaxFloat64
for _, point := range series.Points { for _, point := range series.Points {
if point[0] != nil { if point[0].Valid {
allNull = false allNull = false
if value < *point[0] { if value < point[0].Float64 {
value = *point[0] value = point[0].Float64
} }
} }
} }
...@@ -64,10 +65,10 @@ func (s *SimpleReducer) Reduce(series *tsdb.TimeSeries) *float64 { ...@@ -64,10 +65,10 @@ func (s *SimpleReducer) Reduce(series *tsdb.TimeSeries) *float64 {
} }
if allNull { if allNull {
return nil return null.FloatFromPtr(nil)
} }
return &value return null.FloatFrom(value)
} }
func NewSimpleReducer(typ string) *SimpleReducer { func NewSimpleReducer(typ string) *SimpleReducer {
......
...@@ -10,44 +10,41 @@ import ( ...@@ -10,44 +10,41 @@ import (
func TestSimpleReducer(t *testing.T) { func TestSimpleReducer(t *testing.T) {
Convey("Test simple reducer by calculating", t, func() { Convey("Test simple reducer by calculating", t, func() {
Convey("avg", func() { Convey("avg", func() {
result := *testReducer("avg", 1, 2, 3) result := testReducer("avg", 1, 2, 3)
So(result, ShouldEqual, float64(2)) So(result, ShouldEqual, float64(2))
}) })
Convey("sum", func() { Convey("sum", func() {
result := *testReducer("sum", 1, 2, 3) result := testReducer("sum", 1, 2, 3)
So(result, ShouldEqual, float64(6)) So(result, ShouldEqual, float64(6))
}) })
Convey("min", func() { Convey("min", func() {
result := *testReducer("min", 3, 2, 1) result := testReducer("min", 3, 2, 1)
So(result, ShouldEqual, float64(1)) So(result, ShouldEqual, float64(1))
}) })
Convey("max", func() { Convey("max", func() {
result := *testReducer("max", 1, 2, 3) result := testReducer("max", 1, 2, 3)
So(result, ShouldEqual, float64(3)) So(result, ShouldEqual, float64(3))
}) })
Convey("count", func() { Convey("count", func() {
result := *testReducer("count", 1, 2, 3000) result := testReducer("count", 1, 2, 3000)
So(result, ShouldEqual, float64(3)) So(result, ShouldEqual, float64(3))
}) })
}) })
} }
func testReducer(typ string, datapoints ...float64) *float64 { func testReducer(typ string, datapoints ...float64) float64 {
reducer := NewSimpleReducer(typ) reducer := NewSimpleReducer(typ)
var timeserie [][2]*float64 series := &tsdb.TimeSeries{
dummieTimestamp := float64(521452145) Name: "test time serie",
}
for idx := range datapoints { for idx := range datapoints {
timeserie = append(timeserie, [2]*float64{&datapoints[idx], &dummieTimestamp}) series.Points = append(series.Points, tsdb.NewTimePoint(datapoints[idx], 1234134))
} }
tsdb := &tsdb.TimeSeries{ return reducer.Reduce(series).Float64
Name: "test time serie",
Points: timeserie,
}
return reducer.Reduce(tsdb)
} }
...@@ -80,6 +80,7 @@ func (e *GraphiteExecutor) Execute(queries tsdb.QuerySlice, context *tsdb.QueryC ...@@ -80,6 +80,7 @@ func (e *GraphiteExecutor) Execute(queries tsdb.QuerySlice, context *tsdb.QueryC
result.QueryResults = make(map[string]*tsdb.QueryResult) result.QueryResults = make(map[string]*tsdb.QueryResult)
queryRes := &tsdb.QueryResult{} queryRes := &tsdb.QueryResult{}
for _, series := range data { for _, series := range data {
queryRes.Series = append(queryRes.Series, &tsdb.TimeSeries{ queryRes.Series = append(queryRes.Series, &tsdb.TimeSeries{
Name: series.Target, Name: series.Target,
......
package graphite package graphite
import "github.com/grafana/grafana/pkg/tsdb"
type TargetResponseDTO struct { type TargetResponseDTO struct {
Target string `json:"target"` Target string `json:"target"`
DataPoints [][2]*float64 `json:"datapoints"` DataPoints tsdb.TimeSeriesPoints `json:"datapoints"`
} }
package tsdb package tsdb
import "github.com/grafana/grafana/pkg/components/simplejson" import (
"github.com/grafana/grafana/pkg/components/simplejson"
"gopkg.in/guregu/null.v3"
)
type Query struct { type Query struct {
RefId string RefId string
...@@ -56,12 +59,28 @@ type QueryResult struct { ...@@ -56,12 +59,28 @@ type QueryResult struct {
type TimeSeries struct { type TimeSeries struct {
Name string `json:"name"` Name string `json:"name"`
Points [][2]*float64 `json:"points"` Points TimeSeriesPoints `json:"points"`
} }
type TimePoint [2]null.Float
type TimeSeriesPoints []TimePoint
type TimeSeriesSlice []*TimeSeries type TimeSeriesSlice []*TimeSeries
func NewTimeSeries(name string, points [][2]*float64) *TimeSeries { func NewTimePoint(value float64, timestamp float64) TimePoint {
return TimePoint{null.FloatFrom(value), null.FloatFrom(timestamp)}
}
func NewTimeSeriesPointsFromArgs(values ...float64) TimeSeriesPoints {
points := make(TimeSeriesPoints, 0)
for i := 0; i < len(values); i += 2 {
points = append(points, NewTimePoint(values[i], values[i+1]))
}
return points
}
func NewTimeSeries(name string, points TimeSeriesPoints) *TimeSeries {
return &TimeSeries{ return &TimeSeries{
Name: name, Name: name,
Points: points, Points: points,
......
...@@ -140,17 +140,15 @@ func parseResponse(value pmodel.Value, query *PrometheusQuery) (map[string]*tsdb ...@@ -140,17 +140,15 @@ func parseResponse(value pmodel.Value, query *PrometheusQuery) (map[string]*tsdb
} }
for _, v := range data { for _, v := range data {
var points [][2]*float64 series := tsdb.TimeSeries{
Name: formatLegend(v.Metric, query),
}
for _, k := range v.Values { for _, k := range v.Values {
timestamp := float64(k.Timestamp) series.Points = append(series.Points, tsdb.NewTimePoint(float64(k.Value), float64(k.Timestamp.Unix()*1000)))
val := float64(k.Value)
points = append(points, [2]*float64{&val, &timestamp})
} }
queryRes.Series = append(queryRes.Series, &tsdb.TimeSeries{ queryRes.Series = append(queryRes.Series, &series)
Name: formatLegend(v.Metric, query),
Points: points,
})
} }
queryResults["A"] = queryRes queryResults["A"] = queryRes
......
...@@ -4,7 +4,6 @@ import ( ...@@ -4,7 +4,6 @@ import (
"math/rand" "math/rand"
"time" "time"
"github.com/grafana/grafana/pkg/log"
"github.com/grafana/grafana/pkg/tsdb" "github.com/grafana/grafana/pkg/tsdb"
) )
...@@ -21,7 +20,7 @@ var ScenarioRegistry map[string]*Scenario ...@@ -21,7 +20,7 @@ var ScenarioRegistry map[string]*Scenario
func init() { func init() {
ScenarioRegistry = make(map[string]*Scenario) ScenarioRegistry = make(map[string]*Scenario)
logger := log.New("tsdb.testdata") //logger := log.New("tsdb.testdata")
registerScenario(&Scenario{ registerScenario(&Scenario{
Id: "random_walk", Id: "random_walk",
...@@ -33,13 +32,11 @@ func init() { ...@@ -33,13 +32,11 @@ func init() {
series := newSeriesForQuery(query) series := newSeriesForQuery(query)
points := make([][2]*float64, 0) points := make(tsdb.TimeSeriesPoints, 0)
walker := rand.Float64() * 100 walker := rand.Float64() * 100
for i := int64(0); i < 10000 && timeWalkerMs < to; i++ { for i := int64(0); i < 10000 && timeWalkerMs < to; i++ {
timestamp := float64(timeWalkerMs) points = append(points, tsdb.NewTimePoint(walker, float64(timeWalkerMs)))
val := float64(walker)
points = append(points, [2]*float64{&val, &timestamp})
walker += rand.Float64() - 0.5 walker += rand.Float64() - 0.5
timeWalkerMs += query.IntervalMs timeWalkerMs += query.IntervalMs
...@@ -72,12 +69,9 @@ func init() { ...@@ -72,12 +69,9 @@ func init() {
series := newSeriesForQuery(query) series := newSeriesForQuery(query)
outsideTime := context.TimeRange.MustGetFrom().Add(-1*time.Hour).Unix() * 1000 outsideTime := context.TimeRange.MustGetFrom().Add(-1*time.Hour).Unix() * 1000
timestamp := float64(outsideTime) series.Points = append(series.Points, tsdb.NewTimePoint(10, float64(outsideTime)))
logger.Info("time", "from", timestamp)
val := float64(10)
series.Points = append(series.Points, [2]*float64{&val, &timestamp})
queryRes.Series = append(queryRes.Series, series) queryRes.Series = append(queryRes.Series, series)
return queryRes return queryRes
}, },
}) })
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment