Commit 0229d28d by bergquist

remove unused structs

parent 55f1b36e
......@@ -31,7 +31,7 @@ func QueryMetrics(c *middleware.Context, reqDto dtos.MetricRequest) Response {
return ApiError(500, "failed to fetch data source", err)
}
request := &tsdb.Request{TimeRange: timeRange}
request := &tsdb.TsdbQuery{TimeRange: timeRange}
for _, query := range reqDto.Queries {
request.Queries = append(request.Queries, &tsdb.Query{
......@@ -98,7 +98,7 @@ func GetTestDataRandomWalk(c *middleware.Context) Response {
intervalMs := c.QueryInt64("intervalMs")
timeRange := tsdb.NewTimeRange(from, to)
request := &tsdb.Request{TimeRange: timeRange}
request := &tsdb.TsdbQuery{TimeRange: timeRange}
request.Queries = append(request.Queries, &tsdb.Query{
RefId: "A",
......
......@@ -22,7 +22,6 @@ import (
_ "github.com/grafana/grafana/pkg/services/alerting/notifiers"
_ "github.com/grafana/grafana/pkg/tsdb/graphite"
_ "github.com/grafana/grafana/pkg/tsdb/influxdb"
_ "github.com/grafana/grafana/pkg/tsdb/mqe"
_ "github.com/grafana/grafana/pkg/tsdb/mysql"
_ "github.com/grafana/grafana/pkg/tsdb/opentsdb"
......
......@@ -139,8 +139,8 @@ func (c *QueryCondition) executeQuery(context *alerting.EvalContext, timeRange *
return result, nil
}
func (c *QueryCondition) getRequestForAlertRule(datasource *m.DataSource, timeRange *tsdb.TimeRange) *tsdb.Request {
req := &tsdb.Request{
func (c *QueryCondition) getRequestForAlertRule(datasource *m.DataSource, timeRange *tsdb.TimeRange) *tsdb.TsdbQuery {
req := &tsdb.TsdbQuery{
TimeRange: timeRange,
Queries: []*tsdb.Query{
{
......
......@@ -168,7 +168,7 @@ func (ctx *queryConditionTestContext) exec() (*alerting.ConditionResult, error)
ctx.condition = condition
condition.HandleRequest = func(context context.Context, req *tsdb.Request) (*tsdb.Response, error) {
condition.HandleRequest = func(context context.Context, req *tsdb.TsdbQuery) (*tsdb.Response, error) {
return &tsdb.Response{
Results: map[string]*tsdb.QueryResult{
"A": {Series: ctx.series},
......
......@@ -4,7 +4,7 @@ import "context"
type Batch struct {
DataSourceId int64
Queries QuerySlice
Queries []*Query
Depends map[string]bool
Done bool
Started bool
......@@ -12,7 +12,7 @@ type Batch struct {
type BatchSlice []*Batch
func newBatch(dsId int64, queries QuerySlice) *Batch {
func newBatch(dsId int64, queries []*Query) *Batch {
return &Batch{
DataSourceId: dsId,
Queries: queries,
......@@ -36,7 +36,10 @@ func (bg *Batch) process(ctx context.Context, resultChan chan *BatchResult, tsdb
return
}
res := executor.Execute(ctx, bg.Queries, tsdbQuery)
res := executor.Execute(ctx, &TsdbQuery{
Queries: bg.Queries,
TimeRange: tsdbQuery.TimeRange,
})
bg.Done = true
resultChan <- res
}
......@@ -55,14 +58,14 @@ func (bg *Batch) allDependenciesAreIn(res *Response) bool {
return true
}
func getBatches(req *Request) (BatchSlice, error) {
func getBatches(req *TsdbQuery) (BatchSlice, error) {
batches := make(BatchSlice, 0)
for _, query := range req.Queries {
if foundBatch := findMatchingBatchGroup(query, batches); foundBatch != nil {
foundBatch.addQuery(query)
} else {
newBatch := newBatch(query.DataSource.Id, QuerySlice{query})
newBatch := newBatch(query.DataSource.Id, []*Query{query})
batches = append(batches, newBatch)
for _, refId := range query.Depends {
......
......@@ -8,7 +8,7 @@ import (
)
type Executor interface {
Execute(ctx context.Context, queries QuerySlice, query *TsdbQuery) *BatchResult
Execute(ctx context.Context, query *TsdbQuery) *BatchResult
}
var registry map[string]GetExecutorFn
......
......@@ -20,9 +20,9 @@ func NewFakeExecutor(dsInfo *models.DataSource) (*FakeExecutor, error) {
}, nil
}
func (e *FakeExecutor) Execute(ctx context.Context, queries QuerySlice, context *TsdbQuery) *BatchResult {
func (e *FakeExecutor) Execute(ctx context.Context, context *TsdbQuery) *BatchResult {
result := &BatchResult{QueryResults: make(map[string]*QueryResult)}
for _, query := range queries {
for _, query := range context.Queries {
if results, has := e.results[query.RefId]; has {
result.QueryResults[query.RefId] = results
}
......
......@@ -47,7 +47,7 @@ func init() {
tsdb.RegisterExecutor("graphite", NewGraphiteExecutor)
}
func (e *GraphiteExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, context *tsdb.TsdbQuery) *tsdb.BatchResult {
func (e *GraphiteExecutor) Execute(ctx context.Context, context *tsdb.TsdbQuery) *tsdb.BatchResult {
result := &tsdb.BatchResult{}
from := "-" + formatTimeRange(context.TimeRange.From)
......@@ -61,7 +61,7 @@ func (e *GraphiteExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice,
"maxDataPoints": []string{"500"},
}
for _, query := range queries {
for _, query := range context.Queries {
if fullTarget, err := query.Model.Get("targetFull").String(); err == nil {
target = fixIntervalFormat(fullTarget)
} else {
......
......@@ -47,10 +47,10 @@ func init() {
tsdb.RegisterExecutor("influxdb", NewInfluxDBExecutor)
}
func (e *InfluxDBExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, context *tsdb.TsdbQuery) *tsdb.BatchResult {
func (e *InfluxDBExecutor) Execute(ctx context.Context, context *tsdb.TsdbQuery) *tsdb.BatchResult {
result := &tsdb.BatchResult{}
query, err := e.getQuery(queries, context)
query, err := e.getQuery(context.Queries, context)
if err != nil {
return result.WithError(err)
}
......@@ -98,7 +98,7 @@ func (e *InfluxDBExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice,
return result
}
func (e *InfluxDBExecutor) getQuery(queries tsdb.QuerySlice, context *tsdb.TsdbQuery) (*Query, error) {
func (e *InfluxDBExecutor) getQuery(queries []*tsdb.Query, context *tsdb.TsdbQuery) (*Query, error) {
for _, v := range queries {
query, err := e.QueryParser.Parse(v.Model, e.DataSource)
......
......@@ -8,14 +8,7 @@ import (
type TsdbQuery struct {
TimeRange *TimeRange
Queries QuerySlice
}
func NewQueryContext(queries QuerySlice, timeRange *TimeRange) *TsdbQuery {
return &TsdbQuery{
TimeRange: timeRange,
Queries: queries,
}
Queries []*Query
}
type Query struct {
......@@ -24,18 +17,10 @@ type Query struct {
Depends []string
DataSource *models.DataSource
Results []*TimeSeries
Exclude bool
MaxDataPoints int64
IntervalMs int64
}
type QuerySlice []*Query
type Request struct {
TimeRange *TimeRange
Queries QuerySlice
}
type Response struct {
BatchTimings []*BatchTiming `json:"timings"`
Results map[string]*QueryResult `json:"results"`
......
package mqe
import (
"context"
"net/http"
"net/url"
"path"
"strings"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/log"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/tsdb"
"golang.org/x/net/context/ctxhttp"
)
var (
MaxWorker int = 4
)
type apiClient struct {
*models.DataSource
log log.Logger
httpClient *http.Client
responseParser *ResponseParser
}
func NewApiClient(httpClient *http.Client, datasource *models.DataSource) *apiClient {
return &apiClient{
DataSource: datasource,
log: log.New("tsdb.mqe"),
httpClient: httpClient,
responseParser: NewResponseParser(),
}
}
func (e *apiClient) PerformRequests(ctx context.Context, queries []QueryToSend) (*tsdb.QueryResult, error) {
queryResult := &tsdb.QueryResult{}
queryCount := len(queries)
jobsChan := make(chan QueryToSend, queryCount)
resultChan := make(chan []*tsdb.TimeSeries, queryCount)
errorsChan := make(chan error, 1)
for w := 1; w <= MaxWorker; w++ {
go e.spawnWorker(ctx, w, jobsChan, resultChan, errorsChan)
}
for _, v := range queries {
jobsChan <- v
}
close(jobsChan)
resultCounter := 0
for {
select {
case timeseries := <-resultChan:
queryResult.Series = append(queryResult.Series, timeseries...)
resultCounter++
if resultCounter == queryCount {
close(resultChan)
return queryResult, nil
}
case err := <-errorsChan:
return nil, err
case <-ctx.Done():
return nil, ctx.Err()
}
}
}
func (e *apiClient) spawnWorker(ctx context.Context, id int, jobs chan QueryToSend, results chan []*tsdb.TimeSeries, errors chan error) {
e.log.Debug("Spawning worker", "id", id)
for query := range jobs {
if setting.Env == setting.DEV {
e.log.Debug("Sending request", "query", query.RawQuery)
}
req, err := e.createRequest(query.RawQuery)
resp, err := ctxhttp.Do(ctx, e.httpClient, req)
if err != nil {
errors <- err
return
}
series, err := e.responseParser.Parse(resp, query)
if err != nil {
errors <- err
return
}
results <- series
}
e.log.Debug("Worker is complete", "id", id)
}
func (e *apiClient) createRequest(query string) (*http.Request, error) {
u, err := url.Parse(e.Url)
if err != nil {
return nil, err
}
u.Path = path.Join(u.Path, "query")
payload := simplejson.New()
payload.Set("query", query)
jsonPayload, err := payload.MarshalJSON()
if err != nil {
return nil, err
}
req, err := http.NewRequest(http.MethodPost, u.String(), strings.NewReader(string(jsonPayload)))
if err != nil {
return nil, err
}
req.Header.Set("User-Agent", "Grafana")
req.Header.Set("Content-Type", "application/json")
if e.BasicAuth {
req.SetBasicAuth(e.BasicAuthUser, e.BasicAuthPassword)
}
return req, nil
}
package mqe
import (
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/tsdb"
)
func NewQueryParser() *QueryParser {
return &QueryParser{}
}
type QueryParser struct{}
func (qp *QueryParser) Parse(model *simplejson.Json, dsInfo *models.DataSource, queryContext *tsdb.TsdbQuery) (*Query, error) {
query := &Query{TimeRange: queryContext.TimeRange}
query.AddClusterToAlias = model.Get("addClusterToAlias").MustBool(false)
query.AddHostToAlias = model.Get("addHostToAlias").MustBool(false)
query.UseRawQuery = model.Get("rawQuery").MustBool(false)
query.RawQuery = model.Get("query").MustString("")
query.Cluster = model.Get("cluster").MustStringArray([]string{})
query.Hosts = model.Get("hosts").MustStringArray([]string{})
var metrics []Metric
var err error
for _, metricsObj := range model.Get("metrics").MustArray() {
metricJson := simplejson.NewFromAny(metricsObj)
var m Metric
m.Alias = metricJson.Get("alias").MustString("")
m.Metric, err = metricJson.Get("metric").String()
if err != nil {
return nil, err
}
metrics = append(metrics, m)
}
query.Metrics = metrics
var functions []Function
for _, functionListObj := range model.Get("functionList").MustArray() {
functionListJson := simplejson.NewFromAny(functionListObj)
var f Function
f.Func = functionListJson.Get("func").MustString("")
if err != nil {
return nil, err
}
if f.Func != "" {
functions = append(functions, f)
}
}
query.FunctionList = functions
return query, nil
}
package mqe
import (
"testing"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/tsdb"
. "github.com/smartystreets/goconvey/convey"
)
func TestMQEQueryParser(t *testing.T) {
Convey("MQE query parser", t, func() {
parser := &QueryParser{}
dsInfo := &models.DataSource{JsonData: simplejson.New()}
queryContext := &tsdb.TsdbQuery{}
Convey("can parse simple mqe model", func() {
json := `
{
"cluster": [],
"hosts": [
"staples-lab-1"
],
"metrics": [
{
"metric": "os.cpu.all*"
}
],
"rawQuery": "",
"refId": "A"
}
`
modelJson, err := simplejson.NewJson([]byte(json))
So(err, ShouldBeNil)
query, err := parser.Parse(modelJson, dsInfo, queryContext)
So(err, ShouldBeNil)
So(query.UseRawQuery, ShouldBeFalse)
So(len(query.Cluster), ShouldEqual, 0)
So(query.Hosts[0], ShouldEqual, "staples-lab-1")
So(query.Metrics[0].Metric, ShouldEqual, "os.cpu.all*")
})
Convey("can parse multi serie mqe model", func() {
json := `
{
"cluster": [
"demoapp"
],
"hosts": [
"staples-lab-1"
],
"metrics": [
{
"metric": "os.cpu.all.active_percentage"
},
{
"metric": "os.disk.sda.io_time"
}
],
"functionList": [
{
"func": "aggregate.min"
},
{
"func": "aggregate.max"
}
],
"rawQuery": "",
"refId": "A",
"addClusterToAlias": true,
"addHostToAlias": true
}
`
modelJson, err := simplejson.NewJson([]byte(json))
So(err, ShouldBeNil)
query, err := parser.Parse(modelJson, dsInfo, queryContext)
So(err, ShouldBeNil)
So(query.UseRawQuery, ShouldBeFalse)
So(query.Cluster[0], ShouldEqual, "demoapp")
So(query.Metrics[0].Metric, ShouldEqual, "os.cpu.all.active_percentage")
So(query.Metrics[1].Metric, ShouldEqual, "os.disk.sda.io_time")
So(query.FunctionList[0].Func, ShouldEqual, "aggregate.min")
So(query.FunctionList[1].Func, ShouldEqual, "aggregate.max")
})
Convey("can parse raw query", func() {
json := `
{
"addClusterToAlias": true,
"addHostToAlias": true,
"cluster": [],
"hosts": [
"staples-lab-1"
],
"metrics": [
{
"alias": "cpu active",
"metric": "os.cpu.all.active_percentage"
},
{
"alias": "disk sda time",
"metric": "os.disk.sda.io_time"
}
],
"rawQuery": true,
"query": "raw-query",
"refId": "A"
}
`
modelJson, err := simplejson.NewJson([]byte(json))
So(err, ShouldBeNil)
query, err := parser.Parse(modelJson, dsInfo, queryContext)
So(err, ShouldBeNil)
So(query.UseRawQuery, ShouldBeTrue)
So(query.RawQuery, ShouldEqual, "raw-query")
So(query.AddClusterToAlias, ShouldBeTrue)
So(query.AddHostToAlias, ShouldBeTrue)
})
})
}
package mqe
import (
"context"
"net/http"
"github.com/grafana/grafana/pkg/log"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/tsdb"
)
type MQEExecutor struct {
*models.DataSource
queryParser *QueryParser
apiClient *apiClient
httpClient *http.Client
log log.Logger
tokenClient *TokenClient
}
func NewMQEExecutor(dsInfo *models.DataSource) (tsdb.Executor, error) {
httpclient, err := dsInfo.GetHttpClient()
if err != nil {
return nil, err
}
return &MQEExecutor{
DataSource: dsInfo,
httpClient: httpclient,
log: log.New("tsdb.mqe"),
queryParser: NewQueryParser(),
apiClient: NewApiClient(httpclient, dsInfo),
tokenClient: NewTokenClient(dsInfo),
}, nil
}
func init() {
tsdb.RegisterExecutor("mqe-datasource", NewMQEExecutor)
}
type QueryToSend struct {
RawQuery string
Metric Metric
QueryRef *Query
}
func (e *MQEExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, queryContext *tsdb.TsdbQuery) *tsdb.BatchResult {
result := &tsdb.BatchResult{}
availableSeries, err := e.tokenClient.GetTokenData(ctx)
if err != nil {
return result.WithError(err)
}
var mqeQueries []*Query
for _, v := range queries {
q, err := e.queryParser.Parse(v.Model, e.DataSource, queryContext)
if err != nil {
return result.WithError(err)
}
mqeQueries = append(mqeQueries, q)
}
var rawQueries []QueryToSend
for _, v := range mqeQueries {
queries, err := v.Build(availableSeries.Metrics)
if err != nil {
return result.WithError(err)
}
rawQueries = append(rawQueries, queries...)
}
e.log.Debug("Sending request", "url", e.DataSource.Url)
queryResult, err := e.apiClient.PerformRequests(ctx, rawQueries)
if err != nil {
return result.WithError(err)
}
result.QueryResults = make(map[string]*tsdb.QueryResult)
result.QueryResults["A"] = queryResult
return result
}
package mqe
import (
"encoding/json"
"io/ioutil"
"net/http"
"strconv"
"strings"
"fmt"
"regexp"
"github.com/grafana/grafana/pkg/components/null"
"github.com/grafana/grafana/pkg/log"
"github.com/grafana/grafana/pkg/tsdb"
)
func NewResponseParser() *ResponseParser {
return &ResponseParser{
log: log.New("tsdb.mqe"),
}
}
var (
indexAliasPattern *regexp.Regexp
wildcardAliasPattern *regexp.Regexp
)
func init() {
indexAliasPattern = regexp.MustCompile(`\$(\d)`)
wildcardAliasPattern = regexp.MustCompile(`[*!]`)
}
type MQEResponse struct {
Success bool `json:"success"`
Name string `json:"name"`
Body []MQEResponseSerie `json:"body"`
}
type ResponseTimeRange struct {
Start int64 `json:"start"`
End int64 `json:"end"`
Resolution int64 `json:"Resolution"`
}
type MQEResponseSerie struct {
Query string `json:"query"`
Name string `json:"name"`
Type string `json:"type"`
Series []MQESerie `json:"series"`
TimeRange ResponseTimeRange `json:"timerange"`
}
type MQESerie struct {
Values []null.Float `json:"values"`
Tagset map[string]string `json:"tagset"`
}
type ResponseParser struct {
log log.Logger
}
func (parser *ResponseParser) Parse(res *http.Response, queryRef QueryToSend) ([]*tsdb.TimeSeries, error) {
body, err := ioutil.ReadAll(res.Body)
defer res.Body.Close()
if err != nil {
return nil, err
}
if res.StatusCode/100 != 2 {
parser.log.Error("Request failed", "status code", res.StatusCode, "body", string(body))
return nil, fmt.Errorf("Returned invalid statuscode")
}
var data *MQEResponse = &MQEResponse{}
err = json.Unmarshal(body, data)
if err != nil {
parser.log.Info("Failed to unmarshal response", "error", err, "status", res.Status, "body", string(body))
return nil, err
}
if !data.Success {
return nil, fmt.Errorf("Request failed.")
}
var series []*tsdb.TimeSeries
for _, body := range data.Body {
for _, mqeSerie := range body.Series {
serie := &tsdb.TimeSeries{
Tags: map[string]string{},
Name: parser.formatLegend(body, mqeSerie, queryRef),
}
for key, value := range mqeSerie.Tagset {
serie.Tags[key] = value
}
for i, value := range mqeSerie.Values {
timestamp := body.TimeRange.Start + int64(i)*body.TimeRange.Resolution
serie.Points = append(serie.Points, tsdb.NewTimePoint(value, float64(timestamp)))
}
series = append(series, serie)
}
}
return series, nil
}
func (parser *ResponseParser) formatLegend(body MQEResponseSerie, mqeSerie MQESerie, queryToSend QueryToSend) string {
namePrefix := ""
//append predefined tags to seriename
for key, value := range mqeSerie.Tagset {
if key == "cluster" && queryToSend.QueryRef.AddClusterToAlias {
namePrefix += value + " "
}
}
for key, value := range mqeSerie.Tagset {
if key == "host" && queryToSend.QueryRef.AddHostToAlias {
namePrefix += value + " "
}
}
return namePrefix + parser.formatName(body, queryToSend)
}
func (parser *ResponseParser) formatName(body MQEResponseSerie, queryToSend QueryToSend) string {
if indexAliasPattern.MatchString(queryToSend.Metric.Alias) {
return parser.indexAlias(body, queryToSend)
}
if wildcardAliasPattern.MatchString(queryToSend.Metric.Metric) && wildcardAliasPattern.MatchString(queryToSend.Metric.Alias) {
return parser.wildcardAlias(body, queryToSend)
}
return body.Name
}
func (parser *ResponseParser) wildcardAlias(body MQEResponseSerie, queryToSend QueryToSend) string {
regString := strings.Replace(queryToSend.Metric.Metric, `*`, `(.*)`, 1)
reg, err := regexp.Compile(regString)
if err != nil {
return queryToSend.Metric.Alias
}
matches := reg.FindAllStringSubmatch(queryToSend.RawQuery, -1)
if len(matches) == 0 || len(matches[0]) < 2 {
return queryToSend.Metric.Alias
}
return matches[0][1]
}
func (parser *ResponseParser) indexAlias(body MQEResponseSerie, queryToSend QueryToSend) string {
queryNameParts := strings.Split(queryToSend.Metric.Metric, `.`)
name := indexAliasPattern.ReplaceAllStringFunc(queryToSend.Metric.Alias, func(in string) string {
positionName := strings.TrimSpace(strings.Replace(in, "$", "", 1))
pos, err := strconv.Atoi(positionName)
if err != nil {
return ""
}
for i, part := range queryNameParts {
if i == pos-1 {
return strings.TrimSpace(part)
}
}
return ""
})
return strings.Replace(name, " ", ".", -1)
}
package mqe
import (
"testing"
"net/http"
"strings"
"io/ioutil"
"github.com/grafana/grafana/pkg/components/null"
. "github.com/smartystreets/goconvey/convey"
)
var (
testJson string
)
func TestMQEResponseParser(t *testing.T) {
Convey("MQE response parser", t, func() {
parser := NewResponseParser()
Convey("Can parse response", func() {
queryRef := QueryToSend{
QueryRef: &Query{
AddClusterToAlias: true,
AddHostToAlias: true,
},
Metric: Metric{Alias: ""},
}
response := &http.Response{
StatusCode: 200,
Body: ioutil.NopCloser(strings.NewReader(testJson)),
}
res, err := parser.Parse(response, queryRef)
So(err, ShouldBeNil)
So(len(res), ShouldEqual, 2)
So(len(res[0].Points), ShouldEqual, 14)
So(res[0].Name, ShouldEqual, "demoapp staples-lab-1 os.disk.sda3.weighted_io_time")
startTime := 1479287280000
for i := 0; i < 11; i++ {
So(res[0].Points[i][0].Float64, ShouldEqual, i+1)
So(res[0].Points[i][1].Float64, ShouldEqual, startTime+(i*30000))
}
})
Convey("Can format legend", func() {
mqeSerie := MQESerie{
Tagset: map[string]string{
"cluster": "demoapp",
"host": "staples-lab-1",
},
Values: []null.Float{null.NewFloat(3, true)},
}
Convey("with empty alias", func() {
serie := MQEResponseSerie{Name: "os.disk.sda3.weighted_io_time"}
queryRef := QueryToSend{
QueryRef: &Query{
AddClusterToAlias: true,
AddHostToAlias: true,
},
Metric: Metric{Alias: ""},
}
legend := parser.formatLegend(serie, mqeSerie, queryRef)
So(legend, ShouldEqual, "demoapp staples-lab-1 os.disk.sda3.weighted_io_time")
})
Convey("with index alias (ex $2 $3)", func() {
serie := MQEResponseSerie{Name: "os.disk.sda3.weighted_io_time"}
queryRef := QueryToSend{
QueryRef: &Query{
AddClusterToAlias: true,
AddHostToAlias: true,
},
Metric: Metric{Alias: "$2 $3", Metric: "os.disk.sda3.weighted_io_time"},
}
legend := parser.formatLegend(serie, mqeSerie, queryRef)
So(legend, ShouldEqual, "demoapp staples-lab-1 disk.sda3")
})
Convey("with wildcard alias", func() {
serie := MQEResponseSerie{Name: "os.disk.sda3.weighted_io_time", Query: "os.disk.*"}
queryRef := QueryToSend{
QueryRef: &Query{
AddClusterToAlias: true,
AddHostToAlias: true,
},
RawQuery: "os.disk.sda3.weighted_io_time",
Metric: Metric{Alias: "*", Metric: "os.disk.*.weighted_io_time"},
}
legend := parser.formatLegend(serie, mqeSerie, queryRef)
So(legend, ShouldEqual, "demoapp staples-lab-1 sda3")
})
})
})
}
func init() {
testJson = `{
"success": true,
"name": "select",
"body": [
{
"query": "os.disk.sda3.weighted_io_time",
"name": "os.disk.sda3.weighted_io_time",
"type": "series",
"series": [
{
"tagset": {
"cluster": "demoapp",
"host": "staples-lab-1"
},
"values": [1,2,3,4,5,6,7,8,9,10,11, null, null, null]
},
{
"tagset": {
"cluster": "demoapp",
"host": "staples-lab-2"
},
"values": [11,10,9,8,7,6,5,4,3,2,1]
}
],
"timerange": {
"start": 1479287280000,
"end": 1479287580000,
"resolution": 30000
}
}
],
"metadata": {
"description": {
"cluster": [
"demoapp"
],
"host": [
"staples-lab-1",
"staples-lab-2"
]
},
"notes": null,
"profile": [
{
"name": "Parsing Query",
"start": "2016-11-16T04:16:21.874354721-05:00",
"finish": "2016-11-16T04:16:21.874762291-05:00"
},
{
"name": "Cassandra GetAllTags",
"start": "2016-11-16T04:16:21.874907171-05:00",
"finish": "2016-11-16T04:16:21.876401922-05:00"
},
{
"name": "CachedMetricMetadataAPI_GetAllTags_Expired",
"start": "2016-11-16T04:16:21.874904751-05:00",
"finish": "2016-11-16T04:16:21.876407852-05:00"
},
{
"name": "CachedMetricMetadataAPI_GetAllTags",
"start": "2016-11-16T04:16:21.874899491-05:00",
"finish": "2016-11-16T04:16:21.876410382-05:00"
},
{
"name": "Blueflood FetchSingleTimeseries Resolution",
"description": "os.disk.sda3.weighted_io_time [app=demoapp,host=staples-lab-1]\n at 30s",
"start": "2016-11-16T04:16:21.876623312-05:00",
"finish": "2016-11-16T04:16:21.881763444-05:00"
},
{
"name": "Blueflood FetchSingleTimeseries Resolution",
"description": "os.disk.sda3.weighted_io_time [app=demoapp,host=staples-lab-2]\n at 30s",
"start": "2016-11-16T04:16:21.876642682-05:00",
"finish": "2016-11-16T04:16:21.881895914-05:00"
},
{
"name": "Blueflood FetchMultipleTimeseries",
"start": "2016-11-16T04:16:21.876418022-05:00",
"finish": "2016-11-16T04:16:21.881921474-05:00"
}
]
}
}
`
}
package mqe
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"path"
"time"
"golang.org/x/net/context/ctxhttp"
"strconv"
"github.com/grafana/grafana/pkg/log"
"github.com/grafana/grafana/pkg/models"
"github.com/patrickmn/go-cache"
)
var tokenCache *cache.Cache
func init() {
tokenCache = cache.New(5*time.Minute, 30*time.Second)
}
type TokenClient struct {
log log.Logger
Datasource *models.DataSource
HttpClient *http.Client
}
func NewTokenClient(datasource *models.DataSource) *TokenClient {
httpClient, _ := datasource.GetHttpClient()
return &TokenClient{
log: log.New("tsdb.mqe.tokenclient"),
Datasource: datasource,
HttpClient: httpClient,
}
}
func (client *TokenClient) GetTokenData(ctx context.Context) (*TokenBody, error) {
key := strconv.FormatInt(client.Datasource.Id, 10)
item, found := tokenCache.Get(key)
if found {
if result, ok := item.(*TokenBody); ok {
return result, nil
}
}
b, err := client.RequestTokenData(ctx)
if err != nil {
return nil, err
}
tokenCache.Set(key, b, cache.DefaultExpiration)
return b, nil
}
func (client *TokenClient) RequestTokenData(ctx context.Context) (*TokenBody, error) {
u, _ := url.Parse(client.Datasource.Url)
u.Path = path.Join(u.Path, "token")
req, err := http.NewRequest(http.MethodGet, u.String(), nil)
if err != nil {
client.log.Info("Failed to create request", "error", err)
}
res, err := ctxhttp.Do(ctx, client.HttpClient, req)
if err != nil {
return nil, err
}
body, err := ioutil.ReadAll(res.Body)
defer res.Body.Close()
if err != nil {
return nil, err
}
if res.StatusCode/100 != 2 {
client.log.Info("Request failed", "status", res.Status, "body", string(body))
return nil, fmt.Errorf("Request failed status: %v", res.Status)
}
var result *TokenResponse
err = json.Unmarshal(body, &result)
if err != nil {
client.log.Info("Failed to unmarshal response", "error", err, "status", res.Status, "body", string(body))
return nil, err
}
if !result.Success {
return nil, fmt.Errorf("Request failed for unknown reason.")
}
return &result.Body, nil
}
package mqe
import (
"context"
"testing"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/models"
. "github.com/smartystreets/goconvey/convey"
)
func TestTokenClient(t *testing.T) {
SkipConvey("Token client", t, func() {
dsInfo := &models.DataSource{
JsonData: simplejson.New(),
Url: "",
}
client := NewTokenClient(dsInfo)
body, err := client.RequestTokenData(context.TODO())
So(err, ShouldBeNil)
//So(len(body.Functions), ShouldBeGreaterThan, 1)
So(len(body.Metrics), ShouldBeGreaterThan, 1)
})
}
package mqe
import (
"fmt"
"strings"
"regexp"
"github.com/grafana/grafana/pkg/log"
"github.com/grafana/grafana/pkg/tsdb"
)
type Metric struct {
Metric string
Alias string
}
type Function struct {
Func string
}
type Query struct {
Metrics []Metric
Hosts []string
Cluster []string
FunctionList []Function
AddClusterToAlias bool
AddHostToAlias bool
TimeRange *tsdb.TimeRange
UseRawQuery bool
RawQuery string
}
var (
containsWildcardPattern *regexp.Regexp = regexp.MustCompile(`\*`)
)
func (q *Query) Build(availableSeries []string) ([]QueryToSend, error) {
var queriesToSend []QueryToSend
where := q.buildWhereClause()
functions := q.buildFunctionList()
for _, metric := range q.Metrics {
alias := ""
if metric.Alias != "" {
alias = fmt.Sprintf(" {%s}", metric.Alias)
}
if !containsWildcardPattern.Match([]byte(metric.Metric)) {
rawQuery := q.renderQuerystring(metric.Metric, functions, alias, where, q.TimeRange)
queriesToSend = append(queriesToSend, QueryToSend{
RawQuery: rawQuery,
QueryRef: q,
Metric: metric,
})
} else {
m := strings.Replace(metric.Metric, "*", ".*", -1)
mp, err := regexp.Compile(m)
if err != nil {
log.Error2("failed to compile regex for ", "metric", m)
continue
}
//TODO: this lookup should be cached
for _, wildcardMatch := range availableSeries {
if mp.Match([]byte(wildcardMatch)) {
rawQuery := q.renderQuerystring(wildcardMatch, functions, alias, where, q.TimeRange)
queriesToSend = append(queriesToSend, QueryToSend{
RawQuery: rawQuery,
QueryRef: q,
Metric: metric,
})
}
}
}
}
return queriesToSend, nil
}
func (q *Query) renderQuerystring(path, functions, alias, where string, timerange *tsdb.TimeRange) string {
return fmt.Sprintf(
"`%s`%s%s %s from %v to %v",
path,
functions,
alias,
where,
q.TimeRange.GetFromAsMsEpoch(),
q.TimeRange.GetToAsMsEpoch())
}
func (q *Query) buildFunctionList() string {
functions := ""
for _, v := range q.FunctionList {
functions = fmt.Sprintf("%s|%s", functions, v.Func)
}
return functions
}
func (q *Query) buildWhereClause() string {
hasApps := len(q.Cluster) > 0
hasHosts := len(q.Hosts) > 0
where := ""
if hasHosts || hasApps {
where += "where "
}
if hasApps {
apps := strings.Join(q.Cluster, "', '")
where += fmt.Sprintf("cluster in ('%s')", apps)
}
if hasHosts && hasApps {
where += " and "
}
if hasHosts {
hosts := strings.Join(q.Hosts, "', '")
where += fmt.Sprintf("host in ('%s')", hosts)
}
return where
}
type TokenBody struct {
Metrics []string
}
type TokenResponse struct {
Success bool
Body TokenBody
}
package mqe
import (
"testing"
"time"
"fmt"
"github.com/grafana/grafana/pkg/tsdb"
. "github.com/smartystreets/goconvey/convey"
)
func TestWildcardExpansion(t *testing.T) {
availableMetrics := []string{
"os.cpu.all.idle",
"os.cpu.1.idle",
"os.cpu.2.idle",
"os.cpu.3.idle",
}
now := time.Now()
from := now.Add((time.Minute*5)*-1).UnixNano() / int64(time.Millisecond)
to := now.UnixNano() / int64(time.Millisecond)
Convey("Can expanding query", t, func() {
Convey("Without wildcard series", func() {
query := &Query{
Metrics: []Metric{
{Metric: "os.cpu.3.idle", Alias: ""},
{Metric: "os.cpu.2.idle", Alias: ""},
{Metric: "os.cpu.1.idle", Alias: "cpu"},
},
Hosts: []string{"staples-lab-1", "staples-lab-2"},
Cluster: []string{"demoapp-1", "demoapp-2"},
AddClusterToAlias: false,
AddHostToAlias: false,
FunctionList: []Function{
{Func: "aggregate.min"},
},
TimeRange: &tsdb.TimeRange{Now: now, From: "5m", To: "now"},
}
expandeQueries, err := query.Build(availableMetrics)
So(err, ShouldBeNil)
So(len(expandeQueries), ShouldEqual, 3)
So(expandeQueries[0].RawQuery, ShouldEqual, fmt.Sprintf("`os.cpu.3.idle`|aggregate.min where cluster in ('demoapp-1', 'demoapp-2') and host in ('staples-lab-1', 'staples-lab-2') from %v to %v", from, to))
So(expandeQueries[1].RawQuery, ShouldEqual, fmt.Sprintf("`os.cpu.2.idle`|aggregate.min where cluster in ('demoapp-1', 'demoapp-2') and host in ('staples-lab-1', 'staples-lab-2') from %v to %v", from, to))
So(expandeQueries[2].RawQuery, ShouldEqual, fmt.Sprintf("`os.cpu.1.idle`|aggregate.min {cpu} where cluster in ('demoapp-1', 'demoapp-2') and host in ('staples-lab-1', 'staples-lab-2') from %v to %v", from, to))
})
Convey("With two aggregate functions", func() {
query := &Query{
Metrics: []Metric{
{Metric: "os.cpu.3.idle", Alias: ""},
},
Hosts: []string{"staples-lab-1", "staples-lab-2"},
Cluster: []string{"demoapp-1", "demoapp-2"},
AddClusterToAlias: false,
AddHostToAlias: false,
FunctionList: []Function{
{Func: "aggregate.min"},
{Func: "aggregate.max"},
},
TimeRange: &tsdb.TimeRange{Now: now, From: "5m", To: "now"},
}
expandeQueries, err := query.Build(availableMetrics)
So(err, ShouldBeNil)
So(len(expandeQueries), ShouldEqual, 1)
So(expandeQueries[0].RawQuery, ShouldEqual, fmt.Sprintf("`os.cpu.3.idle`|aggregate.min|aggregate.max where cluster in ('demoapp-1', 'demoapp-2') and host in ('staples-lab-1', 'staples-lab-2') from %v to %v", from, to))
})
Convey("Containing wildcard series", func() {
query := &Query{
Metrics: []Metric{
{Metric: "os.cpu*", Alias: ""},
},
Hosts: []string{"staples-lab-1"},
AddClusterToAlias: false,
AddHostToAlias: false,
TimeRange: &tsdb.TimeRange{Now: now, From: "5m", To: "now"},
}
expandeQueries, err := query.Build(availableMetrics)
So(err, ShouldBeNil)
So(len(expandeQueries), ShouldEqual, 4)
So(expandeQueries[0].RawQuery, ShouldEqual, fmt.Sprintf("`os.cpu.all.idle` where host in ('staples-lab-1') from %v to %v", from, to))
So(expandeQueries[1].RawQuery, ShouldEqual, fmt.Sprintf("`os.cpu.1.idle` where host in ('staples-lab-1') from %v to %v", from, to))
So(expandeQueries[2].RawQuery, ShouldEqual, fmt.Sprintf("`os.cpu.2.idle` where host in ('staples-lab-1') from %v to %v", from, to))
So(expandeQueries[3].RawQuery, ShouldEqual, fmt.Sprintf("`os.cpu.3.idle` where host in ('staples-lab-1') from %v to %v", from, to))
})
})
}
......@@ -81,7 +81,7 @@ func (e *MysqlExecutor) initEngine() error {
return nil
}
func (e *MysqlExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, context *tsdb.TsdbQuery) *tsdb.BatchResult {
func (e *MysqlExecutor) Execute(ctx context.Context, context *tsdb.TsdbQuery) *tsdb.BatchResult {
result := &tsdb.BatchResult{
QueryResults: make(map[string]*tsdb.QueryResult),
}
......@@ -91,7 +91,7 @@ func (e *MysqlExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, co
defer session.Close()
db := session.DB()
for _, query := range queries {
for _, query := range context.Queries {
rawSql := query.Model.Get("rawSql").MustString()
if rawSql == "" {
continue
......
......@@ -48,7 +48,7 @@ func init() {
tsdb.RegisterExecutor("opentsdb", NewOpenTsdbExecutor)
}
func (e *OpenTsdbExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, queryContext *tsdb.TsdbQuery) *tsdb.BatchResult {
func (e *OpenTsdbExecutor) Execute(ctx context.Context, queryContext *tsdb.TsdbQuery) *tsdb.BatchResult {
result := &tsdb.BatchResult{}
var tsdbQuery OpenTsdbQuery
......@@ -56,7 +56,7 @@ func (e *OpenTsdbExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice,
tsdbQuery.Start = queryContext.TimeRange.GetFromAsMsEpoch()
tsdbQuery.End = queryContext.TimeRange.GetToAsMsEpoch()
for _, query := range queries {
for _, query := range queryContext.Queries {
metric := e.buildMetric(query)
tsdbQuery.Queries = append(tsdbQuery.Queries, metric)
}
......
......@@ -84,7 +84,7 @@ func (e *PrometheusExecutor) getClient() (apiv1.API, error) {
return apiv1.NewAPI(client), nil
}
func (e *PrometheusExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, queryContext *tsdb.TsdbQuery) *tsdb.BatchResult {
func (e *PrometheusExecutor) Execute(ctx context.Context, queryContext *tsdb.TsdbQuery) *tsdb.BatchResult {
result := &tsdb.BatchResult{}
client, err := e.getClient()
......@@ -92,7 +92,7 @@ func (e *PrometheusExecutor) Execute(ctx context.Context, queries tsdb.QuerySlic
return result.WithError(err)
}
query, err := parseQuery(queries, queryContext)
query, err := parseQuery(queryContext.Queries, queryContext)
if err != nil {
return result.WithError(err)
}
......@@ -142,7 +142,7 @@ func formatLegend(metric model.Metric, query *PrometheusQuery) string {
return string(result)
}
func parseQuery(queries tsdb.QuerySlice, queryContext *tsdb.TsdbQuery) (*PrometheusQuery, error) {
func parseQuery(queries []*tsdb.Query, queryContext *tsdb.TsdbQuery) (*PrometheusQuery, error) {
queryModel := queries[0]
expr, err := queryModel.Model.Get("expr").String()
......
......@@ -4,10 +4,13 @@ import (
"context"
)
type HandleRequestFunc func(ctx context.Context, req *Request) (*Response, error)
type HandleRequestFunc func(ctx context.Context, req *TsdbQuery) (*Response, error)
func HandleRequest(ctx context.Context, req *Request) (*Response, error) {
tsdbQuery := NewQueryContext(req.Queries, req.TimeRange)
func HandleRequest(ctx context.Context, req *TsdbQuery) (*Response, error) {
tsdbQuery := &TsdbQuery{
Queries: req.Queries,
TimeRange: req.TimeRange,
}
batches, err := getBatches(req)
if err != nil {
......
......@@ -24,11 +24,11 @@ func init() {
tsdb.RegisterExecutor("grafana-testdata-datasource", NewTestDataExecutor)
}
func (e *TestDataExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, context *tsdb.TsdbQuery) *tsdb.BatchResult {
func (e *TestDataExecutor) Execute(ctx context.Context, context *tsdb.TsdbQuery) *tsdb.BatchResult {
result := &tsdb.BatchResult{}
result.QueryResults = make(map[string]*tsdb.QueryResult)
for _, query := range queries {
for _, query := range context.Queries {
scenarioId := query.Model.Get("scenarioId").MustString("random_walk")
if scenario, exist := ScenarioRegistry[scenarioId]; exist {
result.QueryResults[query.RefId] = scenario.Handler(query, context)
......
......@@ -14,8 +14,8 @@ func TestMetricQuery(t *testing.T) {
Convey("When batches groups for query", t, func() {
Convey("Given 3 queries for 2 data sources", func() {
request := &Request{
Queries: QuerySlice{
request := &TsdbQuery{
Queries: []*Query{
{RefId: "A", DataSource: &models.DataSource{Id: 1}},
{RefId: "B", DataSource: &models.DataSource{Id: 1}},
{RefId: "C", DataSource: &models.DataSource{Id: 2}},
......@@ -31,8 +31,8 @@ func TestMetricQuery(t *testing.T) {
})
Convey("Given query 2 depends on query 1", func() {
request := &Request{
Queries: QuerySlice{
request := &TsdbQuery{
Queries: []*Query{
{RefId: "A", DataSource: &models.DataSource{Id: 1}},
{RefId: "B", DataSource: &models.DataSource{Id: 2}},
{RefId: "C", DataSource: &models.DataSource{Id: 3}, Depends: []string{"A", "B"}},
......@@ -55,8 +55,8 @@ func TestMetricQuery(t *testing.T) {
})
Convey("When executing request with one query", t, func() {
req := &Request{
Queries: QuerySlice{
req := &TsdbQuery{
Queries: []*Query{
{RefId: "A", DataSource: &models.DataSource{Id: 1, Type: "test"}},
},
}
......@@ -74,8 +74,8 @@ func TestMetricQuery(t *testing.T) {
})
Convey("When executing one request with two queries from same data source", t, func() {
req := &Request{
Queries: QuerySlice{
req := &TsdbQuery{
Queries: []*Query{
{RefId: "A", DataSource: &models.DataSource{Id: 1, Type: "test"}},
{RefId: "B", DataSource: &models.DataSource{Id: 1, Type: "test"}},
},
......@@ -100,8 +100,8 @@ func TestMetricQuery(t *testing.T) {
})
Convey("When executing one request with three queries from different datasources", t, func() {
req := &Request{
Queries: QuerySlice{
req := &TsdbQuery{
Queries: []*Query{
{RefId: "A", DataSource: &models.DataSource{Id: 1, Type: "test"}},
{RefId: "B", DataSource: &models.DataSource{Id: 1, Type: "test"}},
{RefId: "C", DataSource: &models.DataSource{Id: 2, Type: "test"}},
......@@ -117,8 +117,8 @@ func TestMetricQuery(t *testing.T) {
})
Convey("When query uses data source of unknown type", t, func() {
req := &Request{
Queries: QuerySlice{
req := &TsdbQuery{
Queries: []*Query{
{RefId: "A", DataSource: &models.DataSource{Id: 1, Type: "asdasdas"}},
},
}
......@@ -128,8 +128,8 @@ func TestMetricQuery(t *testing.T) {
})
Convey("When executing request that depend on other query", t, func() {
req := &Request{
Queries: QuerySlice{
req := &TsdbQuery{
Queries: []*Query{
{
RefId: "A", DataSource: &models.DataSource{Id: 1, Type: "test"},
},
......@@ -150,7 +150,7 @@ func TestMetricQuery(t *testing.T) {
fakeExecutor.HandleQuery("B", func(c *TsdbQuery) *QueryResult {
return &QueryResult{
Series: TimeSeriesSlice{
&TimeSeries{Name: "Bres+" + c.Results["A"].Series[0].Name},
&TimeSeries{Name: "Bres+Ares"},
}}
})
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment