Commit 0254a29e by Sven Klemm Committed by Marcus Efraimsson

Interpolate $__interval in backend for alerting with sql datasources (#13156)

add support for interpolate $__interval and  $__interval_ms in sql datasources
parent bae56071
......@@ -138,7 +138,7 @@ func (c *baseClientImpl) encodeBatchRequests(requests []*multiRequest) ([]byte,
}
body := string(reqBody)
body = strings.Replace(body, "$__interval_ms", strconv.FormatInt(r.interval.Value.Nanoseconds()/int64(time.Millisecond), 10), -1)
body = strings.Replace(body, "$__interval_ms", strconv.FormatInt(r.interval.Milliseconds(), 10), -1)
body = strings.Replace(body, "$__interval", r.interval.Text, -1)
payload.WriteString(body + "\n")
......
......@@ -4,7 +4,6 @@ import (
"fmt"
"strconv"
"strings"
"time"
"regexp"
......@@ -34,7 +33,7 @@ func (query *Query) Build(queryContext *tsdb.TsdbQuery) (string, error) {
res = strings.Replace(res, "$timeFilter", query.renderTimeFilter(queryContext), -1)
res = strings.Replace(res, "$interval", interval.Text, -1)
res = strings.Replace(res, "$__interval_ms", strconv.FormatInt(interval.Value.Nanoseconds()/int64(time.Millisecond), 10), -1)
res = strings.Replace(res, "$__interval_ms", strconv.FormatInt(interval.Milliseconds(), 10), -1)
res = strings.Replace(res, "$__interval", interval.Text, -1)
return res, nil
}
......
......@@ -49,6 +49,10 @@ func NewIntervalCalculator(opt *IntervalOptions) *intervalCalculator {
return calc
}
func (i *Interval) Milliseconds() int64 {
return i.Value.Nanoseconds() / int64(time.Millisecond)
}
func (ic *intervalCalculator) Calculate(timerange *TimeRange, minInterval time.Duration) Interval {
to := timerange.MustGetTo().UnixNano()
from := timerange.MustGetFrom().UnixNano()
......
......@@ -13,12 +13,13 @@ const rsIdentifier = `([_a-zA-Z0-9]+)`
const sExpr = `\$` + rsIdentifier + `\(([^\)]*)\)`
type msSqlMacroEngine struct {
*tsdb.SqlMacroEngineBase
timeRange *tsdb.TimeRange
query *tsdb.Query
}
func newMssqlMacroEngine() tsdb.SqlMacroEngine {
return &msSqlMacroEngine{}
return &msSqlMacroEngine{SqlMacroEngineBase: tsdb.NewSqlMacroEngineBase()}
}
func (m *msSqlMacroEngine) Interpolate(query *tsdb.Query, timeRange *tsdb.TimeRange, sql string) (string, error) {
......@@ -27,7 +28,7 @@ func (m *msSqlMacroEngine) Interpolate(query *tsdb.Query, timeRange *tsdb.TimeRa
rExp, _ := regexp.Compile(sExpr)
var macroError error
sql = replaceAllStringSubmatchFunc(rExp, sql, func(groups []string) string {
sql = m.ReplaceAllStringSubmatchFunc(rExp, sql, func(groups []string) string {
args := strings.Split(groups[2], ",")
for i, arg := range args {
args[i] = strings.Trim(arg, " ")
......@@ -47,23 +48,6 @@ func (m *msSqlMacroEngine) Interpolate(query *tsdb.Query, timeRange *tsdb.TimeRa
return sql, nil
}
func replaceAllStringSubmatchFunc(re *regexp.Regexp, str string, repl func([]string) string) string {
result := ""
lastIndex := 0
for _, v := range re.FindAllSubmatchIndex([]byte(str), -1) {
groups := []string{}
for i := 0; i < len(v); i += 2 {
groups = append(groups, str[v[i]:v[i+1]])
}
result += str[lastIndex:v[0]] + repl(groups)
lastIndex = v[1]
}
return result + str[lastIndex:]
}
func (m *msSqlMacroEngine) evaluateMacro(name string, args []string) (string, error) {
switch name {
case "__time":
......
......@@ -35,6 +35,11 @@ func TestMSSQL(t *testing.T) {
return x, nil
}
origInterpolate := tsdb.Interpolate
tsdb.Interpolate = func(query *tsdb.Query, timeRange *tsdb.TimeRange, sql string) (string, error) {
return sql, nil
}
endpoint, err := newMssqlQueryEndpoint(&models.DataSource{
JsonData: simplejson.New(),
SecureJsonData: securejsondata.SecureJsonData{},
......@@ -47,6 +52,7 @@ func TestMSSQL(t *testing.T) {
Reset(func() {
sess.Close()
tsdb.NewXormEngine = origXormEngine
tsdb.Interpolate = origInterpolate
})
Convey("Given a table with different native data types", func() {
......@@ -295,6 +301,40 @@ func TestMSSQL(t *testing.T) {
})
Convey("When doing a metric query using timeGroup and $__interval", func() {
mockInterpolate := tsdb.Interpolate
tsdb.Interpolate = origInterpolate
Reset(func() {
tsdb.Interpolate = mockInterpolate
})
Convey("Should replace $__interval", func() {
query := &tsdb.TsdbQuery{
Queries: []*tsdb.Query{
{
DataSource: &models.DataSource{},
Model: simplejson.NewFromAny(map[string]interface{}{
"rawSql": "SELECT $__timeGroup(time, $__interval) AS time, avg(value) as value FROM metric GROUP BY $__timeGroup(time, $__interval) ORDER BY 1",
"format": "time_series",
}),
RefId: "A",
},
},
TimeRange: &tsdb.TimeRange{
From: fmt.Sprintf("%v", fromStart.Unix()*1000),
To: fmt.Sprintf("%v", fromStart.Add(30*time.Minute).Unix()*1000),
},
}
resp, err := endpoint.Query(nil, nil, query)
So(err, ShouldBeNil)
queryResult := resp.Results["A"]
So(queryResult.Error, ShouldBeNil)
So(queryResult.Meta.Get("sql").MustString(), ShouldEqual, "SELECT FLOOR(DATEDIFF(second, '1970-01-01', time)/60)*60 AS time, avg(value) as value FROM metric GROUP BY FLOOR(DATEDIFF(second, '1970-01-01', time)/60)*60 ORDER BY 1")
})
})
Convey("When doing a metric query using timeGroup with float fill enabled", func() {
query := &tsdb.TsdbQuery{
Queries: []*tsdb.Query{
......
......@@ -9,17 +9,17 @@ import (
"github.com/grafana/grafana/pkg/tsdb"
)
//const rsString = `(?:"([^"]*)")`;
const rsIdentifier = `([_a-zA-Z0-9]+)`
const sExpr = `\$` + rsIdentifier + `\(([^\)]*)\)`
type mySqlMacroEngine struct {
*tsdb.SqlMacroEngineBase
timeRange *tsdb.TimeRange
query *tsdb.Query
}
func newMysqlMacroEngine() tsdb.SqlMacroEngine {
return &mySqlMacroEngine{}
return &mySqlMacroEngine{SqlMacroEngineBase: tsdb.NewSqlMacroEngineBase()}
}
func (m *mySqlMacroEngine) Interpolate(query *tsdb.Query, timeRange *tsdb.TimeRange, sql string) (string, error) {
......@@ -28,7 +28,7 @@ func (m *mySqlMacroEngine) Interpolate(query *tsdb.Query, timeRange *tsdb.TimeRa
rExp, _ := regexp.Compile(sExpr)
var macroError error
sql = replaceAllStringSubmatchFunc(rExp, sql, func(groups []string) string {
sql = m.ReplaceAllStringSubmatchFunc(rExp, sql, func(groups []string) string {
args := strings.Split(groups[2], ",")
for i, arg := range args {
args[i] = strings.Trim(arg, " ")
......@@ -48,23 +48,6 @@ func (m *mySqlMacroEngine) Interpolate(query *tsdb.Query, timeRange *tsdb.TimeRa
return sql, nil
}
func replaceAllStringSubmatchFunc(re *regexp.Regexp, str string, repl func([]string) string) string {
result := ""
lastIndex := 0
for _, v := range re.FindAllSubmatchIndex([]byte(str), -1) {
groups := []string{}
for i := 0; i < len(v); i += 2 {
groups = append(groups, str[v[i]:v[i+1]])
}
result += str[lastIndex:v[0]] + repl(groups)
lastIndex = v[1]
}
return result + str[lastIndex:]
}
func (m *mySqlMacroEngine) evaluateMacro(name string, args []string) (string, error) {
switch name {
case "__timeEpoch", "__time":
......
......@@ -42,6 +42,11 @@ func TestMySQL(t *testing.T) {
return x, nil
}
origInterpolate := tsdb.Interpolate
tsdb.Interpolate = func(query *tsdb.Query, timeRange *tsdb.TimeRange, sql string) (string, error) {
return sql, nil
}
endpoint, err := newMysqlQueryEndpoint(&models.DataSource{
JsonData: simplejson.New(),
SecureJsonData: securejsondata.SecureJsonData{},
......@@ -54,6 +59,7 @@ func TestMySQL(t *testing.T) {
Reset(func() {
sess.Close()
tsdb.NewXormEngine = origXormEngine
tsdb.Interpolate = origInterpolate
})
Convey("Given a table with different native data types", func() {
......@@ -295,6 +301,40 @@ func TestMySQL(t *testing.T) {
})
Convey("When doing a metric query using timeGroup and $__interval", func() {
mockInterpolate := tsdb.Interpolate
tsdb.Interpolate = origInterpolate
Reset(func() {
tsdb.Interpolate = mockInterpolate
})
Convey("Should replace $__interval", func() {
query := &tsdb.TsdbQuery{
Queries: []*tsdb.Query{
{
DataSource: &models.DataSource{},
Model: simplejson.NewFromAny(map[string]interface{}{
"rawSql": "SELECT $__timeGroup(time, $__interval) AS time, avg(value) as value FROM metric GROUP BY 1 ORDER BY 1",
"format": "time_series",
}),
RefId: "A",
},
},
TimeRange: &tsdb.TimeRange{
From: fmt.Sprintf("%v", fromStart.Unix()*1000),
To: fmt.Sprintf("%v", fromStart.Add(30*time.Minute).Unix()*1000),
},
}
resp, err := endpoint.Query(nil, nil, query)
So(err, ShouldBeNil)
queryResult := resp.Results["A"]
So(queryResult.Error, ShouldBeNil)
So(queryResult.Meta.Get("sql").MustString(), ShouldEqual, "SELECT UNIX_TIMESTAMP(time) DIV 60 * 60 AS time, avg(value) as value FROM metric GROUP BY 1 ORDER BY 1")
})
})
Convey("When doing a metric query using timeGroup with value fill enabled", func() {
query := &tsdb.TsdbQuery{
Queries: []*tsdb.Query{
......
......@@ -9,18 +9,21 @@ import (
"github.com/grafana/grafana/pkg/tsdb"
)
//const rsString = `(?:"([^"]*)")`;
const rsIdentifier = `([_a-zA-Z0-9]+)`
const sExpr = `\$` + rsIdentifier + `\(([^\)]*)\)`
type postgresMacroEngine struct {
*tsdb.SqlMacroEngineBase
timeRange *tsdb.TimeRange
query *tsdb.Query
timescaledb bool
}
func newPostgresMacroEngine(timescaledb bool) tsdb.SqlMacroEngine {
return &postgresMacroEngine{timescaledb: timescaledb}
return &postgresMacroEngine{
SqlMacroEngineBase: tsdb.NewSqlMacroEngineBase(),
timescaledb: timescaledb,
}
}
func (m *postgresMacroEngine) Interpolate(query *tsdb.Query, timeRange *tsdb.TimeRange, sql string) (string, error) {
......@@ -29,7 +32,7 @@ func (m *postgresMacroEngine) Interpolate(query *tsdb.Query, timeRange *tsdb.Tim
rExp, _ := regexp.Compile(sExpr)
var macroError error
sql = replaceAllStringSubmatchFunc(rExp, sql, func(groups []string) string {
sql = m.ReplaceAllStringSubmatchFunc(rExp, sql, func(groups []string) string {
// detect if $__timeGroup is supposed to add AS time for pre 5.3 compatibility
// if there is a ',' directly after the macro call $__timeGroup is probably used
......@@ -66,23 +69,6 @@ func (m *postgresMacroEngine) Interpolate(query *tsdb.Query, timeRange *tsdb.Tim
return sql, nil
}
func replaceAllStringSubmatchFunc(re *regexp.Regexp, str string, repl func([]string) string) string {
result := ""
lastIndex := 0
for _, v := range re.FindAllSubmatchIndex([]byte(str), -1) {
groups := []string{}
for i := 0; i < len(v); i += 2 {
groups = append(groups, str[v[i]:v[i+1]])
}
result += str[lastIndex:v[0]] + repl(groups)
lastIndex = v[1]
}
return result + str[lastIndex:]
}
func (m *postgresMacroEngine) evaluateMacro(name string, args []string) (string, error) {
switch name {
case "__time":
......
......@@ -43,6 +43,11 @@ func TestPostgres(t *testing.T) {
return x, nil
}
origInterpolate := tsdb.Interpolate
tsdb.Interpolate = func(query *tsdb.Query, timeRange *tsdb.TimeRange, sql string) (string, error) {
return sql, nil
}
endpoint, err := newPostgresQueryEndpoint(&models.DataSource{
JsonData: simplejson.New(),
SecureJsonData: securejsondata.SecureJsonData{},
......@@ -55,6 +60,7 @@ func TestPostgres(t *testing.T) {
Reset(func() {
sess.Close()
tsdb.NewXormEngine = origXormEngine
tsdb.Interpolate = origInterpolate
})
Convey("Given a table with different native data types", func() {
......@@ -222,6 +228,40 @@ func TestPostgres(t *testing.T) {
}
})
Convey("When doing a metric query using timeGroup and $__interval", func() {
mockInterpolate := tsdb.Interpolate
tsdb.Interpolate = origInterpolate
Reset(func() {
tsdb.Interpolate = mockInterpolate
})
Convey("Should replace $__interval", func() {
query := &tsdb.TsdbQuery{
Queries: []*tsdb.Query{
{
DataSource: &models.DataSource{},
Model: simplejson.NewFromAny(map[string]interface{}{
"rawSql": "SELECT $__timeGroup(time, $__interval) AS time, avg(value) as value FROM metric GROUP BY 1 ORDER BY 1",
"format": "time_series",
}),
RefId: "A",
},
},
TimeRange: &tsdb.TimeRange{
From: fmt.Sprintf("%v", fromStart.Unix()*1000),
To: fmt.Sprintf("%v", fromStart.Add(30*time.Minute).Unix()*1000),
},
}
resp, err := endpoint.Query(nil, nil, query)
So(err, ShouldBeNil)
queryResult := resp.Results["A"]
So(queryResult.Error, ShouldBeNil)
So(queryResult.Meta.Get("sql").MustString(), ShouldEqual, "SELECT floor(extract(epoch from time)/60)*60 AS time, avg(value) as value FROM metric GROUP BY 1 ORDER BY 1")
})
})
Convey("When doing a metric query using timeGroup with NULL fill enabled", func() {
query := &tsdb.TsdbQuery{
Queries: []*tsdb.Query{
......
......@@ -6,6 +6,7 @@ import (
"database/sql"
"fmt"
"math"
"regexp"
"strconv"
"strings"
"sync"
......@@ -43,6 +44,8 @@ var engineCache = engineCacheType{
versions: make(map[int64]int),
}
var sqlIntervalCalculator = NewIntervalCalculator(nil)
var NewXormEngine = func(driverName string, connectionString string) (*xorm.Engine, error) {
return xorm.NewEngine(driverName, connectionString)
}
......@@ -126,7 +129,15 @@ func (e *sqlQueryEndpoint) Query(ctx context.Context, dsInfo *models.DataSource,
queryResult := &QueryResult{Meta: simplejson.New(), RefId: query.RefId}
result.Results[query.RefId] = queryResult
rawSQL, err := e.macroEngine.Interpolate(query, tsdbQuery.TimeRange, rawSQL)
// global substitutions
rawSQL, err := Interpolate(query, tsdbQuery.TimeRange, rawSQL)
if err != nil {
queryResult.Error = err
continue
}
// datasource specific substitutions
rawSQL, err = e.macroEngine.Interpolate(query, tsdbQuery.TimeRange, rawSQL)
if err != nil {
queryResult.Error = err
continue
......@@ -163,6 +174,20 @@ func (e *sqlQueryEndpoint) Query(ctx context.Context, dsInfo *models.DataSource,
return result, nil
}
// global macros/substitutions for all sql datasources
var Interpolate = func(query *Query, timeRange *TimeRange, sql string) (string, error) {
minInterval, err := GetIntervalFrom(query.DataSource, query.Model, time.Second*60)
if err != nil {
return sql, nil
}
interval := sqlIntervalCalculator.Calculate(timeRange, minInterval)
sql = strings.Replace(sql, "$__interval_ms", strconv.FormatInt(interval.Milliseconds(), 10), -1)
sql = strings.Replace(sql, "$__interval", interval.Text, -1)
return sql, nil
}
func (e *sqlQueryEndpoint) transformToTable(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error {
columnNames, err := rows.Columns()
columnCount := len(columnNames)
......@@ -589,3 +614,26 @@ func SetupFillmode(query *Query, interval time.Duration, fillmode string) error
return nil
}
type SqlMacroEngineBase struct{}
func NewSqlMacroEngineBase() *SqlMacroEngineBase {
return &SqlMacroEngineBase{}
}
func (m *SqlMacroEngineBase) ReplaceAllStringSubmatchFunc(re *regexp.Regexp, str string, repl func([]string) string) string {
result := ""
lastIndex := 0
for _, v := range re.FindAllSubmatchIndex([]byte(str), -1) {
groups := []string{}
for i := 0; i < len(v); i += 2 {
groups = append(groups, str[v[i]:v[i+1]])
}
result += str[lastIndex:v[0]] + repl(groups)
lastIndex = v[1]
}
return result + str[lastIndex:]
}
......@@ -5,6 +5,8 @@ import (
"time"
"github.com/grafana/grafana/pkg/components/null"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/models"
. "github.com/smartystreets/goconvey/convey"
)
......@@ -14,6 +16,35 @@ func TestSqlEngine(t *testing.T) {
dt := time.Date(2018, 3, 14, 21, 20, 6, int(527345*time.Microsecond), time.UTC)
earlyDt := time.Date(1970, 3, 14, 21, 20, 6, int(527345*time.Microsecond), time.UTC)
Convey("Given a time range between 2018-04-12 00:00 and 2018-04-12 00:05", func() {
from := time.Date(2018, 4, 12, 18, 0, 0, 0, time.UTC)
to := from.Add(5 * time.Minute)
timeRange := NewFakeTimeRange("5m", "now", to)
query := &Query{DataSource: &models.DataSource{}, Model: simplejson.New()}
Convey("interpolate $__interval", func() {
sql, err := Interpolate(query, timeRange, "select $__interval ")
So(err, ShouldBeNil)
So(sql, ShouldEqual, "select 1m ")
})
Convey("interpolate $__interval in $__timeGroup", func() {
sql, err := Interpolate(query, timeRange, "select $__timeGroupAlias(time,$__interval)")
So(err, ShouldBeNil)
So(sql, ShouldEqual, "select $__timeGroupAlias(time,1m)")
})
Convey("interpolate $__interval_ms", func() {
sql, err := Interpolate(query, timeRange, "select $__interval_ms ")
So(err, ShouldBeNil)
So(sql, ShouldEqual, "select 60000 ")
})
})
Convey("Given row values with time.Time as time columns", func() {
var nilPointer *time.Time
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment