Commit aa7292fa by bergquist

mqe: adds support for wildcard and index aliases

parent bccb6500
......@@ -86,7 +86,7 @@ func (e *apiClient) spawnWorker(ctx context.Context, id int, jobs chan QueryToSe
return
}
series, err := e.responseParser.Parse(resp, query.QueryRef)
series, err := e.responseParser.Parse(resp, query)
if err != nil {
errors <- err
return
......
......@@ -40,6 +40,7 @@ func init() {
type QueryToSend struct {
RawQuery string
Metric Metric
QueryRef *Query
}
......
......@@ -4,9 +4,13 @@ import (
"encoding/json"
"io/ioutil"
"net/http"
"strconv"
"strings"
"fmt"
"regexp"
"github.com/grafana/grafana/pkg/components/null"
"github.com/grafana/grafana/pkg/log"
"github.com/grafana/grafana/pkg/tsdb"
......@@ -18,6 +22,16 @@ func NewResponseParser() *ResponseParser {
}
}
var (
indexAliasPattern *regexp.Regexp
wildcardAliasPattern *regexp.Regexp
)
func init() {
indexAliasPattern = regexp.MustCompile(`\$(\d)`)
wildcardAliasPattern = regexp.MustCompile(`[*!]`)
}
type MQEResponse struct {
Success bool `json:"success"`
Name string `json:"name"`
......@@ -47,7 +61,7 @@ type ResponseParser struct {
log log.Logger
}
func (parser *ResponseParser) Parse(res *http.Response, queryRef *Query) ([]*tsdb.TimeSeries, error) {
func (parser *ResponseParser) Parse(res *http.Response, queryRef QueryToSend) ([]*tsdb.TimeSeries, error) {
body, err := ioutil.ReadAll(res.Body)
defer res.Body.Close()
if err != nil {
......@@ -73,30 +87,90 @@ func (parser *ResponseParser) Parse(res *http.Response, queryRef *Query) ([]*tsd
var series []*tsdb.TimeSeries
for _, body := range data.Body {
for _, mqeSerie := range body.Series {
serie := &tsdb.TimeSeries{
Tags: map[string]string{},
Name: parser.formatLegend(body, mqeSerie, queryRef),
}
for key, value := range mqeSerie.Tagset {
serie.Tags[key] = value
}
for i, value := range mqeSerie.Values {
timestamp := body.TimeRange.Start + int64(i)*body.TimeRange.Resolution
serie.Points = append(serie.Points, tsdb.NewTimePoint(value, float64(timestamp)))
}
series = append(series, serie)
}
}
return series, nil
}
func (parser *ResponseParser) formatLegend(body MQEResponseSerie, mqeSerie MQESerie, queryToSend QueryToSend) string {
namePrefix := ""
//append predefined tags to seriename
for key, value := range mqeSerie.Tagset {
if key == "cluster" && queryRef.AddClusterToAlias {
if key == "cluster" && queryToSend.QueryRef.AddClusterToAlias {
namePrefix += value + " "
}
}
for key, value := range mqeSerie.Tagset {
if key == "host" && queryRef.AddHostToAlias {
if key == "host" && queryToSend.QueryRef.AddHostToAlias {
namePrefix += value + " "
}
}
serie := &tsdb.TimeSeries{Name: namePrefix + body.Name}
return namePrefix + parser.formatName(body, queryToSend)
}
func (parser *ResponseParser) formatName(body MQEResponseSerie, queryToSend QueryToSend) string {
if indexAliasPattern.MatchString(queryToSend.Metric.Alias) {
return parser.indexAlias(body, queryToSend)
}
if wildcardAliasPattern.MatchString(queryToSend.Metric.Metric) && wildcardAliasPattern.MatchString(queryToSend.Metric.Alias) {
return parser.wildcardAlias(body, queryToSend)
}
for i, value := range mqeSerie.Values {
timestamp := body.TimeRange.Start + int64(i)*body.TimeRange.Resolution
serie.Points = append(serie.Points, tsdb.NewTimePoint(value, float64(timestamp)))
return body.Name
}
func (parser *ResponseParser) wildcardAlias(body MQEResponseSerie, queryToSend QueryToSend) string {
regString := strings.Replace(queryToSend.Metric.Metric, `*`, `(.*)`, 1)
reg, err := regexp.Compile(regString)
if err != nil {
return queryToSend.Metric.Alias
}
series = append(series, serie)
matches := reg.FindAllStringSubmatch(queryToSend.RawQuery, -1)
if len(matches) == 0 || len(matches[0]) < 2 {
return queryToSend.Metric.Alias
}
return matches[0][1]
}
func (parser *ResponseParser) indexAlias(body MQEResponseSerie, queryToSend QueryToSend) string {
queryNameParts := strings.Split(body.Name, `.`)
name := indexAliasPattern.ReplaceAllStringFunc(queryToSend.Metric.Alias, func(in string) string {
positionName := strings.TrimSpace(strings.Replace(in, "$", "", 1))
pos, err := strconv.Atoi(positionName)
if err != nil {
return ""
}
return series, nil
for i, part := range queryNameParts {
if i == pos-1 {
return strings.TrimSpace(part)
}
}
return ""
})
return strings.Replace(name, " ", ".", -1)
}
......@@ -8,11 +8,12 @@ import (
"io/ioutil"
"github.com/grafana/grafana/pkg/components/null"
. "github.com/smartystreets/goconvey/convey"
)
var (
dummieJson string
testJson string
)
func TestMQEResponseParser(t *testing.T) {
......@@ -20,14 +21,17 @@ func TestMQEResponseParser(t *testing.T) {
parser := NewResponseParser()
Convey("Can parse response", func() {
queryRef := &Query{
queryRef := QueryToSend{
QueryRef: &Query{
AddClusterToAlias: true,
AddHostToAlias: true,
},
Metric: Metric{Alias: ""},
}
response := &http.Response{
StatusCode: 200,
Body: ioutil.NopCloser(strings.NewReader(dummieJson)),
Body: ioutil.NopCloser(strings.NewReader(testJson)),
}
res, err := parser.Parse(response, queryRef)
So(err, ShouldBeNil)
......@@ -39,12 +43,64 @@ func TestMQEResponseParser(t *testing.T) {
So(res[0].Points[i][0].Float64, ShouldEqual, i+1)
So(res[0].Points[i][1].Float64, ShouldEqual, startTime+(i*30000))
}
})
Convey("Can format legend", func() {
mqeSerie := MQESerie{
Tagset: map[string]string{
"cluster": "demoapp",
"host": "staples-lab-1",
},
Values: []null.Float{null.NewFloat(3, true)},
}
Convey("with empty alias", func() {
serie := MQEResponseSerie{Name: "os.disk.sda3.weighted_io_time"}
queryRef := QueryToSend{
QueryRef: &Query{
AddClusterToAlias: true,
AddHostToAlias: true,
},
Metric: Metric{Alias: ""},
}
legend := parser.formatLegend(serie, mqeSerie, queryRef)
So(legend, ShouldEqual, "demoapp staples-lab-1 os.disk.sda3.weighted_io_time")
})
Convey("with index alias (ex $2 $3)", func() {
serie := MQEResponseSerie{Name: "os.disk.sda3.weighted_io_time"}
queryRef := QueryToSend{
QueryRef: &Query{
AddClusterToAlias: true,
AddHostToAlias: true,
},
Metric: Metric{Alias: "$2 $3"},
}
legend := parser.formatLegend(serie, mqeSerie, queryRef)
So(legend, ShouldEqual, "demoapp staples-lab-1 disk.sda3")
})
Convey("with wildcard alias", func() {
serie := MQEResponseSerie{Name: "os.disk.sda3.weighted_io_time", Query: "os.disk.*"}
queryRef := QueryToSend{
QueryRef: &Query{
AddClusterToAlias: true,
AddHostToAlias: true,
},
RawQuery: "os.disk.sda3.weighted_io_time",
Metric: Metric{Alias: "*", Metric: "os.disk.*.weighted_io_time"},
}
legend := parser.formatLegend(serie, mqeSerie, queryRef)
So(legend, ShouldEqual, "demoapp staples-lab-1 sda3")
})
})
})
}
func init() {
dummieJson = `{
testJson = `{
"success": true,
"name": "select",
"body": [
......
......@@ -53,6 +53,7 @@ func (q *Query) Build(availableSeries []string) ([]QueryToSend, error) {
queriesToSend = append(queriesToSend, QueryToSend{
RawQuery: rawQuery,
QueryRef: q,
Metric: metric,
})
} else {
m := strings.Replace(metric.Metric, "*", ".*", -1)
......@@ -70,6 +71,7 @@ func (q *Query) Build(availableSeries []string) ([]QueryToSend, error) {
queriesToSend = append(queriesToSend, QueryToSend{
RawQuery: rawQuery,
QueryRef: q,
Metric: metric,
})
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment