Commit 77400cef by Marcus Efraimsson

elasticsearch: refactor and cleanup

Move time series query logic to specific file.
Remove model parser and move to time series query file, adds parser test.
parent 1324a67c
package elasticsearch
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"github.com/grafana/grafana/pkg/log"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/tsdb"
"golang.org/x/net/context/ctxhttp"
"net/http"
"net/url"
"path"
"strings"
"time"
"github.com/grafana/grafana/pkg/log"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/tsdb"
)
type ElasticsearchExecutor struct {
QueryParser *ElasticSearchQueryParser
Transport *http.Transport
}
type ElasticsearchExecutor struct{}
var (
glog log.Logger
......@@ -29,14 +22,7 @@ var (
)
func NewElasticsearchExecutor(dsInfo *models.DataSource) (tsdb.TsdbQueryEndpoint, error) {
transport, err := dsInfo.GetHttpTransport()
if err != nil {
return nil, err
}
return &ElasticsearchExecutor{
Transport: transport,
}, nil
return &ElasticsearchExecutor{}, nil
}
func init() {
......@@ -46,84 +32,11 @@ func init() {
}
func (e *ElasticsearchExecutor) Query(ctx context.Context, dsInfo *models.DataSource, tsdbQuery *tsdb.TsdbQuery) (*tsdb.Response, error) {
result := &tsdb.Response{}
result.Results = make(map[string]*tsdb.QueryResult)
queries, err := e.getQuery(dsInfo, tsdbQuery)
if err != nil {
return nil, err
}
buff := bytes.Buffer{}
for _, q := range queries {
s, err := q.Build(tsdbQuery, dsInfo)
if err != nil {
return nil, err
}
buff.WriteString(s)
}
payload := buff.String()
if setting.Env == setting.DEV {
glog.Debug("Elasticsearch playload", "raw playload", payload)
}
glog.Info("Elasticsearch playload", "raw playload", payload)
req, err := e.createRequest(dsInfo, payload)
if err != nil {
return nil, err
}
httpClient, err := dsInfo.GetHttpClient()
if err != nil {
return nil, err
}
resp, err := ctxhttp.Do(ctx, httpClient, req)
if err != nil {
return nil, err
}
if resp.StatusCode/100 != 2 {
return nil, fmt.Errorf("elasticsearch returned statuscode invalid status code: %v", resp.Status)
}
var responses Responses
dec := json.NewDecoder(resp.Body)
defer resp.Body.Close()
dec.UseNumber()
err = dec.Decode(&responses)
if err != nil {
return nil, err
}
for _, res := range responses.Responses {
if res.Err != nil {
return nil, errors.New(res.getErrMsg())
}
}
responseParser := ElasticsearchResponseParser{responses.Responses, queries}
queryRes := responseParser.getTimeSeries()
result.Results["A"] = queryRes
return result, nil
}
func (e *ElasticsearchExecutor) getQuery(dsInfo *models.DataSource, context *tsdb.TsdbQuery) ([]*Query, error) {
queries := make([]*Query, 0)
if len(context.Queries) == 0 {
return nil, fmt.Errorf("query request contains no queries")
}
for _, v := range context.Queries {
query, err := e.QueryParser.Parse(v.Model, dsInfo)
if err != nil {
return nil, err
}
queries = append(queries, query)
if len(tsdbQuery.Queries) == 0 {
return nil, fmt.Errorf("query contains no queries")
}
return queries, nil
return e.executeTimeSeriesQuery(ctx, dsInfo, tsdbQuery)
}
func (e *ElasticsearchExecutor) createRequest(dsInfo *models.DataSource, query string) (*http.Request, error) {
......
package elasticsearch
import (
"fmt"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/tsdb"
"github.com/leibowitz/moment"
"strings"
"time"
)
type ElasticSearchQueryParser struct {
}
func (qp *ElasticSearchQueryParser) Parse(model *simplejson.Json, dsInfo *models.DataSource) (*Query, error) {
//payload := bytes.Buffer{}
//queryHeader := qp.getQueryHeader()
timeField, err := model.Get("timeField").String()
if err != nil {
return nil, err
}
rawQuery := model.Get("query").MustString()
bucketAggs, err := qp.parseBucketAggs(model)
if err != nil {
return nil, err
}
metrics, err := qp.parseMetrics(model)
if err != nil {
return nil, err
}
alias := model.Get("alias").MustString("")
parsedInterval, err := tsdb.GetIntervalFrom(dsInfo, model, time.Millisecond)
if err != nil {
return nil, err
}
return &Query{timeField,
rawQuery,
bucketAggs,
metrics,
alias,
parsedInterval}, nil
}
func (qp *ElasticSearchQueryParser) parseBucketAggs(model *simplejson.Json) ([]*BucketAgg, error) {
var err error
var result []*BucketAgg
for _, t := range model.Get("bucketAggs").MustArray() {
aggJson := simplejson.NewFromAny(t)
agg := &BucketAgg{}
agg.Type, err = aggJson.Get("type").String()
if err != nil {
return nil, err
}
agg.ID, err = aggJson.Get("id").String()
if err != nil {
return nil, err
}
agg.Field = aggJson.Get("field").MustString()
agg.Settings = simplejson.NewFromAny(aggJson.Get("settings").MustMap())
result = append(result, agg)
}
return result, nil
}
func (qp *ElasticSearchQueryParser) parseMetrics(model *simplejson.Json) ([]*Metric, error) {
var err error
var result []*Metric
for _, t := range model.Get("metrics").MustArray() {
metricJson := simplejson.NewFromAny(t)
metric := &Metric{}
metric.Field = metricJson.Get("field").MustString()
metric.Hide = metricJson.Get("hide").MustBool(false)
metric.ID, err = metricJson.Get("id").String()
if err != nil {
return nil, err
}
metric.PipelineAggregate = metricJson.Get("pipelineAgg").MustString()
metric.Settings = simplejson.NewFromAny(metricJson.Get("settings").MustMap())
metric.Type, err = metricJson.Get("type").String()
if err != nil {
return nil, err
}
result = append(result, metric)
}
return result, nil
}
func getRequestHeader(timeRange *tsdb.TimeRange, dsInfo *models.DataSource) *QueryHeader {
var header QueryHeader
esVersion := dsInfo.JsonData.Get("esVersion").MustInt()
searchType := "query_then_fetch"
if esVersion < 5 {
searchType = "count"
}
header.SearchType = searchType
header.IgnoreUnavailable = true
header.Index = getIndexList(dsInfo.Database, dsInfo.JsonData.Get("interval").MustString(), timeRange)
if esVersion >= 56 {
header.MaxConcurrentShardRequests = dsInfo.JsonData.Get("maxConcurrentShardRequests").MustInt()
}
return &header
}
func getIndexList(pattern string, interval string, timeRange *tsdb.TimeRange) string {
if interval == "" {
return pattern
}
var indexes []string
indexParts := strings.Split(strings.TrimLeft(pattern, "["), "]")
indexBase := indexParts[0]
if len(indexParts) <= 1 {
return pattern
}
indexDateFormat := indexParts[1]
start := moment.NewMoment(timeRange.MustGetFrom())
end := moment.NewMoment(timeRange.MustGetTo())
indexes = append(indexes, fmt.Sprintf("%s%s", indexBase, start.Format(indexDateFormat)))
for start.IsBefore(*end) {
switch interval {
case "Hourly":
start = start.AddHours(1)
case "Daily":
start = start.AddDay()
case "Weekly":
start = start.AddWeeks(1)
case "Monthly":
start = start.AddMonths(1)
case "Yearly":
start = start.AddYears(1)
}
indexes = append(indexes, fmt.Sprintf("%s%s", indexBase, start.Format(indexDateFormat)))
}
return strings.Join(indexes, ",")
}
package elasticsearch
import (
"github.com/grafana/grafana/pkg/tsdb"
. "github.com/smartystreets/goconvey/convey"
"strconv"
"strings"
"testing"
)
func makeTime(hour int) string {
//unixtime 1500000000 == 2017-07-14T02:40:00+00:00
return strconv.Itoa((1500000000 + hour*60*60) * 1000)
}
func getIndexListByTime(pattern string, interval string, hour int) string {
timeRange := &tsdb.TimeRange{
From: makeTime(0),
To: makeTime(hour),
}
return getIndexList(pattern, interval, timeRange)
}
func TestElasticsearchGetIndexList(t *testing.T) {
Convey("Test Elasticsearch getIndex ", t, func() {
Convey("Parse Interval Formats", func() {
So(getIndexListByTime("[logstash-]YYYY.MM.DD", "Daily", 48),
ShouldEqual, "logstash-2017.07.14,logstash-2017.07.15,logstash-2017.07.16")
So(len(strings.Split(getIndexListByTime("[logstash-]YYYY.MM.DD.HH", "Hourly", 3), ",")),
ShouldEqual, 4)
So(getIndexListByTime("[logstash-]YYYY.W", "Weekly", 100),
ShouldEqual, "logstash-2017.28,logstash-2017.29")
So(getIndexListByTime("[logstash-]YYYY.MM", "Monthly", 700),
ShouldEqual, "logstash-2017.07,logstash-2017.08")
So(getIndexListByTime("[logstash-]YYYY", "Yearly", 10000),
ShouldEqual, "logstash-2017,logstash-2018,logstash-2019")
})
Convey("No Interval", func() {
index := getIndexListByTime("logstash-test", "", 1)
So(index, ShouldEqual, "logstash-test")
})
})
}
......@@ -5,12 +5,14 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/tsdb"
"strconv"
"strings"
"time"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/tsdb"
"github.com/leibowitz/moment"
)
var rangeFilterSetting = RangeFilterSetting{Gte: "$timeFrom",
......@@ -22,14 +24,12 @@ type Query struct {
RawQuery string `json:"query"`
BucketAggs []*BucketAgg `json:"bucketAggs"`
Metrics []*Metric `json:"metrics"`
Alias string `json:"Alias"`
Alias string `json:"alias"`
Interval time.Duration
}
func (q *Query) Build(queryContext *tsdb.TsdbQuery, dsInfo *models.DataSource) (string, error) {
var req Request
payload := bytes.Buffer{}
req.Size = 0
q.renderReqQuery(&req)
......@@ -45,6 +45,7 @@ func (q *Query) Build(queryContext *tsdb.TsdbQuery, dsInfo *models.DataSource) (
reqBytes, err := json.Marshal(req)
reqHeader := getRequestHeader(queryContext.TimeRange, dsInfo)
payload := bytes.Buffer{}
payload.WriteString(reqHeader.String() + "\n")
payload.WriteString(string(reqBytes) + "\n")
return q.renderTemplate(payload.String(), queryContext)
......@@ -235,3 +236,61 @@ func (q *Query) renderTemplate(payload string, queryContext *tsdb.TsdbQuery) (st
payload = strings.Replace(payload, "$__interval", interval.Text, -1)
return payload, nil
}
func getRequestHeader(timeRange *tsdb.TimeRange, dsInfo *models.DataSource) *QueryHeader {
var header QueryHeader
esVersion := dsInfo.JsonData.Get("esVersion").MustInt()
searchType := "query_then_fetch"
if esVersion < 5 {
searchType = "count"
}
header.SearchType = searchType
header.IgnoreUnavailable = true
header.Index = getIndexList(dsInfo.Database, dsInfo.JsonData.Get("interval").MustString(), timeRange)
if esVersion >= 56 {
header.MaxConcurrentShardRequests = dsInfo.JsonData.Get("maxConcurrentShardRequests").MustInt()
}
return &header
}
func getIndexList(pattern string, interval string, timeRange *tsdb.TimeRange) string {
if interval == "" {
return pattern
}
var indexes []string
indexParts := strings.Split(strings.TrimLeft(pattern, "["), "]")
indexBase := indexParts[0]
if len(indexParts) <= 1 {
return pattern
}
indexDateFormat := indexParts[1]
start := moment.NewMoment(timeRange.MustGetFrom())
end := moment.NewMoment(timeRange.MustGetTo())
indexes = append(indexes, fmt.Sprintf("%s%s", indexBase, start.Format(indexDateFormat)))
for start.IsBefore(*end) {
switch interval {
case "Hourly":
start = start.AddHours(1)
case "Daily":
start = start.AddDay()
case "Weekly":
start = start.AddWeeks(1)
case "Monthly":
start = start.AddMonths(1)
case "Yearly":
start = start.AddYears(1)
}
indexes = append(indexes, fmt.Sprintf("%s%s", indexBase, start.Format(indexDateFormat)))
}
return strings.Join(indexes, ",")
}
......@@ -3,14 +3,15 @@ package elasticsearch
import (
"encoding/json"
"fmt"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/tsdb"
. "github.com/smartystreets/goconvey/convey"
"reflect"
"strconv"
"strings"
"testing"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/tsdb"
. "github.com/smartystreets/goconvey/convey"
)
func testElasticSearchResponse(query Query, expectedElasticSearchRequestJSON string) {
......@@ -254,3 +255,43 @@ func TestElasticSearchQueryBuilder(t *testing.T) {
})
})
}
func makeTime(hour int) string {
//unixtime 1500000000 == 2017-07-14T02:40:00+00:00
return strconv.Itoa((1500000000 + hour*60*60) * 1000)
}
func getIndexListByTime(pattern string, interval string, hour int) string {
timeRange := &tsdb.TimeRange{
From: makeTime(0),
To: makeTime(hour),
}
return getIndexList(pattern, interval, timeRange)
}
func TestElasticsearchGetIndexList(t *testing.T) {
Convey("Test Elasticsearch getIndex ", t, func() {
Convey("Parse Interval Formats", func() {
So(getIndexListByTime("[logstash-]YYYY.MM.DD", "Daily", 48),
ShouldEqual, "logstash-2017.07.14,logstash-2017.07.15,logstash-2017.07.16")
So(len(strings.Split(getIndexListByTime("[logstash-]YYYY.MM.DD.HH", "Hourly", 3), ",")),
ShouldEqual, 4)
So(getIndexListByTime("[logstash-]YYYY.W", "Weekly", 100),
ShouldEqual, "logstash-2017.28,logstash-2017.29")
So(getIndexListByTime("[logstash-]YYYY.MM", "Monthly", 700),
ShouldEqual, "logstash-2017.07,logstash-2017.08")
So(getIndexListByTime("[logstash-]YYYY", "Yearly", 10000),
ShouldEqual, "logstash-2017,logstash-2018,logstash-2019")
})
Convey("No Interval", func() {
index := getIndexListByTime("logstash-test", "", 1)
So(index, ShouldEqual, "logstash-test")
})
})
}
......@@ -2,9 +2,10 @@ package elasticsearch
import (
"encoding/json"
"testing"
"github.com/grafana/grafana/pkg/tsdb"
. "github.com/smartystreets/goconvey/convey"
"testing"
)
func testElasticsearchResponse(body string, target Query) *tsdb.QueryResult {
......
package elasticsearch
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"time"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/tsdb"
"golang.org/x/net/context/ctxhttp"
)
type timeSeriesQuery struct {
queries []*Query
}
func (e *ElasticsearchExecutor) executeTimeSeriesQuery(ctx context.Context, dsInfo *models.DataSource, tsdbQuery *tsdb.TsdbQuery) (*tsdb.Response, error) {
result := &tsdb.Response{}
result.Results = make(map[string]*tsdb.QueryResult)
tsQueryParser := newTimeSeriesQueryParser(dsInfo)
query, err := tsQueryParser.parse(tsdbQuery)
if err != nil {
return nil, err
}
buff := bytes.Buffer{}
for _, q := range query.queries {
s, err := q.Build(tsdbQuery, dsInfo)
if err != nil {
return nil, err
}
buff.WriteString(s)
}
payload := buff.String()
if setting.Env == setting.DEV {
glog.Debug("Elasticsearch playload", "raw playload", payload)
}
glog.Info("Elasticsearch playload", "raw playload", payload)
req, err := e.createRequest(dsInfo, payload)
if err != nil {
return nil, err
}
httpClient, err := dsInfo.GetHttpClient()
if err != nil {
return nil, err
}
resp, err := ctxhttp.Do(ctx, httpClient, req)
if err != nil {
return nil, err
}
if resp.StatusCode/100 != 2 {
return nil, fmt.Errorf("elasticsearch returned statuscode invalid status code: %v", resp.Status)
}
var responses Responses
defer resp.Body.Close()
dec := json.NewDecoder(resp.Body)
dec.UseNumber()
err = dec.Decode(&responses)
if err != nil {
return nil, err
}
for _, res := range responses.Responses {
if res.Err != nil {
return nil, errors.New(res.getErrMsg())
}
}
responseParser := ElasticsearchResponseParser{responses.Responses, query.queries}
queryRes := responseParser.getTimeSeries()
result.Results["A"] = queryRes
return result, nil
}
type timeSeriesQueryParser struct {
ds *models.DataSource
}
func newTimeSeriesQueryParser(ds *models.DataSource) *timeSeriesQueryParser {
return &timeSeriesQueryParser{
ds: ds,
}
}
func (p *timeSeriesQueryParser) parse(tsdbQuery *tsdb.TsdbQuery) (*timeSeriesQuery, error) {
queries := make([]*Query, 0)
for _, q := range tsdbQuery.Queries {
model := q.Model
timeField, err := model.Get("timeField").String()
if err != nil {
return nil, err
}
rawQuery := model.Get("query").MustString()
bucketAggs, err := p.parseBucketAggs(model)
if err != nil {
return nil, err
}
metrics, err := p.parseMetrics(model)
if err != nil {
return nil, err
}
alias := model.Get("alias").MustString("")
parsedInterval, err := tsdb.GetIntervalFrom(p.ds, model, time.Millisecond)
if err != nil {
return nil, err
}
queries = append(queries, &Query{
TimeField: timeField,
RawQuery: rawQuery,
BucketAggs: bucketAggs,
Metrics: metrics,
Alias: alias,
Interval: parsedInterval,
})
}
return &timeSeriesQuery{queries: queries}, nil
}
func (p *timeSeriesQueryParser) parseBucketAggs(model *simplejson.Json) ([]*BucketAgg, error) {
var err error
var result []*BucketAgg
for _, t := range model.Get("bucketAggs").MustArray() {
aggJson := simplejson.NewFromAny(t)
agg := &BucketAgg{}
agg.Type, err = aggJson.Get("type").String()
if err != nil {
return nil, err
}
agg.ID, err = aggJson.Get("id").String()
if err != nil {
return nil, err
}
agg.Field = aggJson.Get("field").MustString()
agg.Settings = simplejson.NewFromAny(aggJson.Get("settings").MustMap())
result = append(result, agg)
}
return result, nil
}
func (p *timeSeriesQueryParser) parseMetrics(model *simplejson.Json) ([]*Metric, error) {
var err error
var result []*Metric
for _, t := range model.Get("metrics").MustArray() {
metricJSON := simplejson.NewFromAny(t)
metric := &Metric{}
metric.Field = metricJSON.Get("field").MustString()
metric.Hide = metricJSON.Get("hide").MustBool(false)
metric.ID, err = metricJSON.Get("id").String()
if err != nil {
return nil, err
}
metric.PipelineAggregate = metricJSON.Get("pipelineAgg").MustString()
metric.Settings = simplejson.NewFromAny(metricJSON.Get("settings").MustMap())
metric.Type, err = metricJSON.Get("type").String()
if err != nil {
return nil, err
}
result = append(result, metric)
}
return result, nil
}
package elasticsearch
import (
"testing"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/tsdb"
. "github.com/smartystreets/goconvey/convey"
)
func TestTimeSeriesQueryParser(t *testing.T) {
Convey("Test time series query parser", t, func() {
ds := &models.DataSource{}
p := newTimeSeriesQueryParser(ds)
Convey("Should be able to parse query", func() {
json, err := simplejson.NewJson([]byte(`{
"timeField": "@timestamp",
"query": "@metric:cpu",
"alias": "{{@hostname}} {{metric}}",
"metrics": [
{
"field": "@value",
"id": "1",
"meta": {},
"settings": {
"percents": [
"90"
]
},
"type": "percentiles"
},
{
"type": "count",
"field": "select field",
"id": "4",
"settings": {},
"meta": {}
}
],
"bucketAggs": [
{
"fake": true,
"field": "@hostname",
"id": "3",
"settings": {
"min_doc_count": 1,
"order": "desc",
"orderBy": "_term",
"size": "10"
},
"type": "terms"
},
{
"field": "@timestamp",
"id": "2",
"settings": {
"interval": "5m",
"min_doc_count": 0,
"trimEdges": 0
},
"type": "date_histogram"
}
]
}`))
So(err, ShouldBeNil)
tsdbQuery := &tsdb.TsdbQuery{
Queries: []*tsdb.Query{
{
DataSource: ds,
Model: json,
},
},
}
tsQuery, err := p.parse(tsdbQuery)
So(err, ShouldBeNil)
So(tsQuery.queries, ShouldHaveLength, 1)
q := tsQuery.queries[0]
So(q.TimeField, ShouldEqual, "@timestamp")
So(q.RawQuery, ShouldEqual, "@metric:cpu")
So(q.Alias, ShouldEqual, "{{@hostname}} {{metric}}")
So(q.Metrics, ShouldHaveLength, 2)
So(q.Metrics[0].Field, ShouldEqual, "@value")
So(q.Metrics[0].ID, ShouldEqual, "1")
So(q.Metrics[0].Type, ShouldEqual, "percentiles")
So(q.Metrics[0].Hide, ShouldBeFalse)
So(q.Metrics[0].PipelineAggregate, ShouldEqual, "")
So(q.Metrics[0].Settings.Get("percents").MustStringArray()[0], ShouldEqual, "90")
So(q.Metrics[1].Field, ShouldEqual, "select field")
So(q.Metrics[1].ID, ShouldEqual, "4")
So(q.Metrics[1].Type, ShouldEqual, "count")
So(q.Metrics[1].Hide, ShouldBeFalse)
So(q.Metrics[1].PipelineAggregate, ShouldEqual, "")
So(q.Metrics[1].Settings.MustMap(), ShouldBeEmpty)
So(q.BucketAggs, ShouldHaveLength, 2)
So(q.BucketAggs[0].Field, ShouldEqual, "@hostname")
So(q.BucketAggs[0].ID, ShouldEqual, "3")
So(q.BucketAggs[0].Type, ShouldEqual, "terms")
So(q.BucketAggs[0].Settings.Get("min_doc_count").MustInt64(), ShouldEqual, 1)
So(q.BucketAggs[0].Settings.Get("order").MustString(), ShouldEqual, "desc")
So(q.BucketAggs[0].Settings.Get("orderBy").MustString(), ShouldEqual, "_term")
So(q.BucketAggs[0].Settings.Get("size").MustString(), ShouldEqual, "10")
So(q.BucketAggs[1].Field, ShouldEqual, "@timestamp")
So(q.BucketAggs[1].ID, ShouldEqual, "2")
So(q.BucketAggs[1].Type, ShouldEqual, "date_histogram")
So(q.BucketAggs[1].Settings.Get("interval").MustString(), ShouldEqual, "5m")
So(q.BucketAggs[1].Settings.Get("min_doc_count").MustInt64(), ShouldEqual, 0)
So(q.BucketAggs[1].Settings.Get("trimEdges").MustInt64(), ShouldEqual, 0)
})
})
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment