Commit d8aa38fa by bergquist

tech(influxdb): refactor and cleanup

parent ab875176
......@@ -3,6 +3,8 @@ package conditions
import (
"testing"
"gopkg.in/guregu/null.v3"
"github.com/grafana/grafana/pkg/tsdb"
. "github.com/smartystreets/goconvey/convey"
)
......@@ -43,7 +45,7 @@ func testReducer(typ string, datapoints ...float64) float64 {
}
for idx := range datapoints {
series.Points = append(series.Points, tsdb.NewTimePoint(datapoints[idx], 1234134))
series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFrom(datapoints[idx]), 1234134))
}
return reducer.Reduce(series).Float64
......
......@@ -10,8 +10,6 @@ import (
"path"
"time"
"gopkg.in/guregu/null.v3"
"golang.org/x/net/context/ctxhttp"
"github.com/grafana/grafana/pkg/log"
......@@ -21,14 +19,14 @@ import (
type InfluxDBExecutor struct {
*tsdb.DataSourceInfo
QueryParser *InfluxdbQueryParser
QueryBuilder *QueryBuild
QueryBuilder *QueryBuilder
}
func NewInfluxDBExecutor(dsInfo *tsdb.DataSourceInfo) tsdb.Executor {
return &InfluxDBExecutor{
DataSourceInfo: dsInfo,
QueryParser: &InfluxdbQueryParser{},
QueryBuilder: &QueryBuild{},
QueryBuilder: &QueryBuilder{},
}
}
......@@ -66,52 +64,55 @@ func (e *InfluxDBExecutor) getQuery(queries tsdb.QuerySlice, context *tsdb.Query
return rawQuery, nil
}
return "", fmt.Errorf("Tsdb request contains no queries")
return "", fmt.Errorf("query request contains no queries")
}
func (e *InfluxDBExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, context *tsdb.QueryContext) *tsdb.BatchResult {
result := &tsdb.BatchResult{}
query, err := e.getQuery(queries, context)
if err != nil {
result.Error = err
return result
}
glog.Info("Influxdb", "query", query)
func (e *InfluxDBExecutor) createRequest(query string) (*http.Request, error) {
u, _ := url.Parse(e.Url)
u.Path = path.Join(u.Path, "query")
req, err := http.NewRequest(http.MethodGet, u.String(), nil)
if err != nil {
result.Error = err
return result
return nil, err
}
params := req.URL.Query()
params.Set("q", query)
params.Set("db", e.Database)
params.Set("epoch", "s")
req.URL.RawQuery = params.Encode()
req.Header.Set("Content-Type", "")
req.Header.Set("User-Agent", "Grafana")
if e.BasicAuth {
req.SetBasicAuth(e.BasicAuthUser, e.BasicAuthPassword)
}
glog.Info("influxdb request", "url", req.URL.String())
glog.Debug("influxdb request", "url", req.URL.String())
return req, nil
}
func (e *InfluxDBExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, context *tsdb.QueryContext) *tsdb.BatchResult {
result := &tsdb.BatchResult{}
query, err := e.getQuery(queries, context)
if err != nil {
return result.WithError(err)
}
glog.Debug("Influxdb query", "raw query", query)
req, err := e.createRequest(query)
if err != nil {
return result.WithError(err)
}
resp, err := ctxhttp.Do(ctx, HttpClient, req)
if err != nil {
result.Error = err
return result
return result.WithError(err)
}
if resp.StatusCode/100 != 2 {
result.Error = fmt.Errorf("Influxdb returned statuscode %v body %v", resp.Status)
return result
return result.WithError(fmt.Errorf("Influxdb returned statuscode invalid status code: %v", resp.Status))
}
var response Response
......@@ -119,73 +120,11 @@ func (e *InfluxDBExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice,
dec.UseNumber()
err = dec.Decode(&response)
if err != nil {
glog.Error("Influxdb decode failed", "err", err)
result.Error = err
return result
return result.WithError(err)
}
result.QueryResults = make(map[string]*tsdb.QueryResult)
queryRes := tsdb.NewQueryResult()
for _, v := range response.Results {
for _, r := range v.Series {
serie := tsdb.TimeSeries{Name: r.Name}
var points tsdb.TimeSeriesPoints
for _, k := range r.Values {
var value null.Float
var err error
num, ok := k[1].(json.Number)
if !ok {
value = null.FloatFromPtr(nil)
} else {
fvalue, err := num.Float64()
if err == nil {
value = null.FloatFrom(fvalue)
}
}
pos0, ok := k[0].(json.Number)
timestamp, err := pos0.Float64()
if err == nil && ok {
points = append(points, tsdb.NewTimePoint(value, timestamp))
} else {
glog.Error("Failed to convert response", "err1", err, "ok", ok, "timestamp", timestamp, "value", value.Float64)
}
serie.Points = points
}
queryRes.Series = append(queryRes.Series, &serie)
}
}
for _, v := range queryRes.Series {
glog.Info("result", "name", v.Name, "points", v.Points)
}
result.QueryResults["A"] = queryRes
result.QueryResults["A"] = ParseQueryResult(&response)
return result
}
type Response struct {
Results []Result
Err error
}
type Result struct {
Series []Row
Messages []*Message
Err error
}
type Message struct {
Level string `json:"level,omitempty"`
Text string `json:"text,omitempty"`
}
type Row struct {
Name string `json:"name,omitempty"`
Tags map[string]string `json:"tags,omitempty"`
Columns []string `json:"columns,omitempty"`
Values [][]interface{} `json:"values,omitempty"`
}
......@@ -12,7 +12,7 @@ func TestInfluxdbQueryParser(t *testing.T) {
parser := &InfluxdbQueryParser{}
Convey("converting metric name", func() {
Convey("can parse influxdb json model", func() {
json := `
{
"dsType": "influxdb",
......
......@@ -23,3 +23,26 @@ type Select []QueryPart
type InfluxDbSelect struct {
Type string
}
type Response struct {
Results []Result
Err error
}
type Result struct {
Series []Row
Messages []*Message
Err error
}
type Message struct {
Level string `json:"level,omitempty"`
Text string `json:"text,omitempty"`
}
type Row struct {
Name string `json:"name,omitempty"`
Tags map[string]string `json:"tags,omitempty"`
Columns []string `json:"columns,omitempty"`
Values [][]interface{} `json:"values,omitempty"`
}
......@@ -7,7 +7,7 @@ import (
"github.com/grafana/grafana/pkg/tsdb"
)
type QueryBuild struct{}
type QueryBuilder struct{}
func renderTags(query *Query) []string {
var res []string
......@@ -29,7 +29,23 @@ func renderTags(query *Query) []string {
return res
}
func (*QueryBuild) Build(query *Query, queryContext *tsdb.QueryContext) (string, error) {
func (*QueryBuilder) Build(query *Query, queryContext *tsdb.QueryContext) (string, error) {
res := renderSelectors(query)
res += renderMeasurement(query)
res += renderWhereClause(query)
res += renderTimeFilter(query)
res += renderGroupBy(query)
return res, nil
}
func renderTimeFilter(query *Query) string {
//res += "$timeFilter"
//res += "time > now() -" + strings.Replace(queryContext.TimeRange.From, "now", "", 1)
return "time > now() - 5m"
}
func renderSelectors(query *Query) string {
res := "SELECT "
var selectors []string
......@@ -41,34 +57,40 @@ func (*QueryBuild) Build(query *Query, queryContext *tsdb.QueryContext) (string,
}
selectors = append(selectors, stk)
}
res += strings.Join(selectors, ", ")
return res + strings.Join(selectors, ", ")
}
func renderMeasurement(query *Query) string {
policy := ""
if query.Policy == "" || query.Policy == "default" {
policy = ""
} else {
policy = `"` + query.Policy + `".`
}
res += fmt.Sprintf(` FROM %s"%s"`, policy, query.Measurement)
return fmt.Sprintf(` FROM %s"%s"`, policy, query.Measurement)
}
res += " WHERE "
func renderWhereClause(query *Query) string {
res := " WHERE "
conditions := renderTags(query)
res += strings.Join(conditions, " ")
if len(conditions) > 0 {
res += " AND "
}
//res += "$timeFilter"
res += "time > " + strings.Replace(queryContext.TimeRange.From, "now", "now()", 1)
return res
}
func renderGroupBy(query *Query) string {
var groupBy []string
for _, group := range query.GroupBy {
groupBy = append(groupBy, group.Render(""))
}
if len(groupBy) > 0 {
res += " GROUP BY " + strings.Join(groupBy, " ")
return " GROUP BY " + strings.Join(groupBy, " ")
}
return res, nil
return ""
}
......@@ -9,7 +9,7 @@ import (
func TestInfluxdbQueryBuilder(t *testing.T) {
Convey("Influxdb query builder", t, func() {
builder := QueryBuild{}
builder := QueryBuilder{}
qp1, _ := NewQueryPart("field", []string{"value"})
qp2, _ := NewQueryPart("mean", []string{})
......@@ -21,10 +21,10 @@ func TestInfluxdbQueryBuilder(t *testing.T) {
tag2 := &Tag{Key: "hostname", Value: "server2", Operator: "=", Condition: "OR"}
queryContext := &tsdb.QueryContext{
TimeRange: tsdb.NewTimeRange("now-5h", "now"),
TimeRange: tsdb.NewTimeRange("now-5m", "now"),
}
Convey("can build query", func() {
Convey("can build simple query", func() {
query := &Query{
Selects: []*Select{{*qp1, *qp2}},
Measurement: "cpu",
......@@ -35,10 +35,10 @@ func TestInfluxdbQueryBuilder(t *testing.T) {
rawQuery, err := builder.Build(query, queryContext)
So(err, ShouldBeNil)
So(rawQuery, ShouldEqual, `SELECT mean("value") FROM "policy"."cpu" WHERE time > now()-5h GROUP BY time(10s) fill(null)`)
So(rawQuery, ShouldEqual, `SELECT mean("value") FROM "policy"."cpu" WHERE time > now() - 5m GROUP BY time(10s) fill(null)`)
})
Convey("can asd query", func() {
Convey("can build query with group bys", func() {
query := &Query{
Selects: []*Select{{*qp1, *qp2}},
Measurement: "cpu",
......@@ -49,7 +49,7 @@ func TestInfluxdbQueryBuilder(t *testing.T) {
rawQuery, err := builder.Build(query, queryContext)
So(err, ShouldBeNil)
So(rawQuery, ShouldEqual, `SELECT mean("value") FROM "cpu" WHERE "hostname" = 'server1' OR "hostname" = 'server2' AND time > now()-5h GROUP BY time(10s)`)
So(rawQuery, ShouldEqual, `SELECT mean("value") FROM "cpu" WHERE "hostname" = 'server1' OR "hostname" = 'server2' AND time > now() - 5m GROUP BY time(10s)`)
})
})
}
......@@ -7,9 +7,9 @@ import (
)
func TestInfluxdbQueryPart(t *testing.T) {
Convey("Influxdb query part builder", t, func() {
Convey("Influxdb query parts", t, func() {
Convey("should handle field renderer parts", func() {
Convey("render field ", func() {
part, err := NewQueryPart("field", []string{"value"})
So(err, ShouldBeNil)
......@@ -17,7 +17,7 @@ func TestInfluxdbQueryPart(t *testing.T) {
So(res, ShouldEqual, `"value"`)
})
Convey("should handle nested function parts", func() {
Convey("render nested part", func() {
part, err := NewQueryPart("derivative", []string{"10s"})
So(err, ShouldBeNil)
......@@ -25,7 +25,7 @@ func TestInfluxdbQueryPart(t *testing.T) {
So(res, ShouldEqual, "derivative(mean(value), 10s)")
})
Convey("bottom", func() {
Convey("render bottom", func() {
part, err := NewQueryPart("bottom", []string{"3"})
So(err, ShouldBeNil)
......@@ -33,7 +33,7 @@ func TestInfluxdbQueryPart(t *testing.T) {
So(res, ShouldEqual, "bottom(value, 3)")
})
Convey("time", func() {
Convey("render time", func() {
part, err := NewQueryPart("time", []string{"$interval"})
So(err, ShouldBeNil)
......@@ -41,7 +41,7 @@ func TestInfluxdbQueryPart(t *testing.T) {
So(res, ShouldEqual, "time(10s)")
})
Convey("should nest spread function", func() {
Convey("render spread", func() {
part, err := NewQueryPart("spread", []string{})
So(err, ShouldBeNil)
......@@ -49,7 +49,7 @@ func TestInfluxdbQueryPart(t *testing.T) {
So(res, ShouldEqual, `spread(value)`)
})
Convey("should handle suffix parts", func() {
Convey("render suffix", func() {
part, err := NewQueryPart("math", []string{"/ 100"})
So(err, ShouldBeNil)
......@@ -57,7 +57,7 @@ func TestInfluxdbQueryPart(t *testing.T) {
So(res, ShouldEqual, "mean(value) / 100")
})
Convey("should handle alias parts", func() {
Convey("render alias", func() {
part, err := NewQueryPart("alias", []string{"test"})
So(err, ShouldBeNil)
......
package influxdb
import (
"encoding/json"
"github.com/grafana/grafana/pkg/tsdb"
"gopkg.in/guregu/null.v3"
)
func ParseQueryResult(response *Response) *tsdb.QueryResult {
queryRes := tsdb.NewQueryResult()
for _, v := range response.Results {
for _, r := range v.Series {
serie := tsdb.TimeSeries{Name: r.Name}
var points tsdb.TimeSeriesPoints
for _, k := range r.Values {
var value null.Float
var err error
num, ok := k[1].(json.Number)
if !ok {
value = null.FloatFromPtr(nil)
} else {
fvalue, err := num.Float64()
if err == nil {
value = null.FloatFrom(fvalue)
}
}
pos0, ok := k[0].(json.Number)
timestamp, err := pos0.Float64()
if err == nil && ok {
points = append(points, tsdb.NewTimePoint(value, timestamp))
} else {
//glog.Error("Failed to convert response", "err1", err, "ok", ok, "timestamp", timestamp, "value", value.Float64)
}
serie.Points = points
}
queryRes.Series = append(queryRes.Series, &serie)
}
}
for _, v := range queryRes.Series {
glog.Info("result", "name", v.Name, "points", v.Points)
}
return queryRes
}
package influxdb
import (
"encoding/json"
"testing"
"github.com/grafana/grafana/pkg/setting"
. "github.com/smartystreets/goconvey/convey"
)
func TestInfluxdbResponseParser(t *testing.T) {
Convey("Influxdb response parser", t, func() {
setting.NewConfigContext(&setting.CommandLineArgs{
HomePath: "../../../",
})
response := &Response{
Results: []Result{
Result{
Series: []Row{
{
Name: "cpu",
Columns: []string{"time", "mean", "sum"},
Values: [][]interface{}{
{json.Number("123"), json.Number("123"), json.Number("123")},
{json.Number("123"), json.Number("123"), json.Number("123")},
{json.Number("123"), json.Number("123"), json.Number("123")},
{json.Number("123"), json.Number("123"), json.Number("123")},
{json.Number("123"), json.Number("123"), json.Number("123")},
{json.Number("123"), json.Number("123"), json.Number("123")},
{json.Number("123"), json.Number("123"), json.Number("123")},
{json.Number("123"), json.Number("123"), json.Number("123")},
{json.Number("123"), json.Number("123"), json.Number("123")},
{json.Number("123"), json.Number("123"), json.Number("123")},
},
},
},
},
},
}
Convey("can parse response", func() {
result := ParseQueryResult(response)
So(len(result.Series), ShouldEqual, 1)
So(len(result.Series[0].Points), ShouldEqual, 10)
})
})
}
......@@ -52,6 +52,11 @@ type BatchResult struct {
Timings *BatchTiming
}
func (br *BatchResult) WithError(err error) *BatchResult {
br.Error = err
return br
}
type QueryResult struct {
Error error `json:"error"`
RefId string `json:"refId"`
......
......@@ -52,12 +52,12 @@ func (e *PrometheusExecutor) Execute(ctx context.Context, queries tsdb.QuerySlic
client, err := e.getClient()
if err != nil {
return resultWithError(result, err)
return result.WithError(err)
}
query, err := parseQuery(queries, queryContext)
if err != nil {
return resultWithError(result, err)
return result.WithError(err)
}
timeRange := prometheus.Range{
......@@ -69,12 +69,12 @@ func (e *PrometheusExecutor) Execute(ctx context.Context, queries tsdb.QuerySlic
value, err := client.QueryRange(ctx, query.Expr, timeRange)
if err != nil {
return resultWithError(result, err)
return result.WithError(err)
}
queryResult, err := parseResponse(value, query)
if err != nil {
return resultWithError(result, err)
return result.WithError(err)
}
result.QueryResults = queryResult
return result
......@@ -157,7 +157,8 @@ func parseResponse(value pmodel.Value, query *PrometheusQuery) (map[string]*tsdb
return queryResults, nil
}
/*
func resultWithError(result *tsdb.BatchResult, err error) *tsdb.BatchResult {
result.Error = err
return result
}
}*/
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment