Commit c63bbbae by Arve Knudsen Committed by GitHub

InfluxDB: Close Flux query results (#26917)

* InfluxDB: Drop ctxhttp usage
* InfluxDB: Clean up code
* InfluxDB: Close query results

Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
parent d823b26b
......@@ -45,7 +45,6 @@ var ptc = proxyTransportCache{
func (ds *DataSource) GetHttpClient() (*http.Client, error) {
transport, err := ds.GetHttpTransport()
if err != nil {
return nil, err
}
......
......@@ -69,7 +69,7 @@ func getConverter(t string) (*data.FieldConverter, error) {
return &AnyToOptionalString, nil
}
return nil, fmt.Errorf("No matching converter found for [%v]", t)
return nil, fmt.Errorf("no matching converter found for [%v]", t)
}
// Init initializes the frame to be returned
......@@ -94,6 +94,7 @@ func (fb *FrameBuilder) Init(metadata *query.FluxTableMetadata) error {
if err != nil {
return err
}
fb.value = converter
fb.isTimeSeries = true
case isTag(col.Name()):
......@@ -106,6 +107,7 @@ func (fb *FrameBuilder) Init(metadata *query.FluxTableMetadata) error {
if col == nil {
return fmt.Errorf("no time column in timeSeries")
}
fb.timeColumn = col.Name()
fb.timeDisplay = "Time"
if "_time" != fb.timeColumn {
......@@ -118,6 +120,7 @@ func (fb *FrameBuilder) Init(metadata *query.FluxTableMetadata) error {
if err != nil {
return err
}
fb.columns = append(fb.columns, columnInfo{
name: col.Name(),
converter: converter,
......@@ -201,13 +204,14 @@ func (fb *FrameBuilder) Append(record *query.FluxRecord) error {
if fb.isTimeSeries {
time, ok := record.ValueByKey(fb.timeColumn).(time.Time)
if !ok {
return fmt.Errorf("unable to get time colum: %s", fb.timeColumn)
return fmt.Errorf("unable to get time colum: %q", fb.timeColumn)
}
val, err := fb.value.Converter(record.Value())
if err != nil {
return err
}
fb.active.Fields[0].Append(time)
fb.active.Fields[1].Append(val)
} else {
......@@ -217,6 +221,7 @@ func (fb *FrameBuilder) Append(record *query.FluxRecord) error {
if err != nil {
return err
}
fb.active.Fields[idx].Append(val)
}
}
......
......@@ -9,9 +9,9 @@ import (
"github.com/influxdata/influxdb-client-go/api"
)
// ExecuteQuery runs a flux query using the QueryModel to interpolate the query and the runner to execute it.
// executeQuery runs a flux query using the QueryModel to interpolate the query and the runner to execute it.
// maxSeries somehow limits the response.
func ExecuteQuery(ctx context.Context, query QueryModel, runner queryRunner, maxSeries int) (dr backend.DataResponse) {
func executeQuery(ctx context.Context, query QueryModel, runner queryRunner, maxSeries int) (dr backend.DataResponse) {
dr = backend.DataResponse{}
flux, err := Interpolate(query)
......@@ -20,10 +20,11 @@ func ExecuteQuery(ctx context.Context, query QueryModel, runner queryRunner, max
return
}
glog.Debug("Flux", "interpolated query", flux)
glog.Debug("Executing Flux query", "interpolated query", flux)
tables, err := runner.runQuery(ctx, flux)
if err != nil {
glog.Warn("Flux query failed", "err", err, "query", flux)
dr.Error = err
metaFrame := data.NewFrame("meta for error")
metaFrame.Meta = &data.FrameMeta{
......@@ -32,6 +33,7 @@ func ExecuteQuery(ctx context.Context, query QueryModel, runner queryRunner, max
dr.Frames = append(dr.Frames, metaFrame)
return
}
defer tables.Close()
dr = readDataFrames(tables, int(float64(query.MaxDataPoints)*1.5), maxSeries)
......@@ -46,6 +48,7 @@ func ExecuteQuery(ctx context.Context, query QueryModel, runner queryRunner, max
}
func readDataFrames(result *api.QueryTableResult, maxPoints int, maxSeries int) (dr backend.DataResponse) {
glog.Debug("Reading data frames from query result", "maxPoints", maxPoints, "maxSeries", maxSeries)
dr = backend.DataResponse{}
builder := &FrameBuilder{
......@@ -69,7 +72,7 @@ func readDataFrames(result *api.QueryTableResult, maxPoints int, maxSeries int)
}
if builder.frames == nil {
dr.Error = fmt.Errorf("Invalid state")
dr.Error = fmt.Errorf("invalid state")
return dr
}
......
......@@ -55,7 +55,7 @@ func verifyGoldenResponse(name string) (*backend.DataResponse, error) {
testDataPath: name + ".csv",
}
dr := ExecuteQuery(context.Background(), QueryModel{MaxDataPoints: 100}, runner, 50)
dr := executeQuery(context.Background(), QueryModel{MaxDataPoints: 100}, runner, 50)
err := experimental.CheckGoldenDataResponse("./testdata/"+name+".golden.txt", &dr, true)
return &dr, err
}
......
......@@ -22,6 +22,7 @@ func init() {
// Query builds flux queries, executes them, and returns the results.
func Query(ctx context.Context, dsInfo *models.DataSource, tsdbQuery *tsdb.TsdbQuery) (*tsdb.Response, error) {
glog.Debug("Received a query", "query", *tsdbQuery)
tRes := &tsdb.Response{
Results: make(map[string]*tsdb.QueryResult),
}
......@@ -38,7 +39,7 @@ func Query(ctx context.Context, dsInfo *models.DataSource, tsdbQuery *tsdb.TsdbQ
continue
}
res := ExecuteQuery(context.Background(), *qm, runner, 50)
res := executeQuery(context.Background(), *qm, runner, 50)
tRes.Results[query.RefId] = backendDataResponseToTSDBResponse(&res, query.RefId)
}
......@@ -57,9 +58,10 @@ type queryRunner interface {
runQuery(ctx context.Context, q string) (*api.QueryTableResult, error)
}
// runQuery executes fluxQuery against the Runner's organization and returns an flux typed result.
// runQuery executes fluxQuery against the Runner's organization and returns a Flux typed result.
func (r *Runner) runQuery(ctx context.Context, fluxQuery string) (*api.QueryTableResult, error) {
return r.client.QueryApi(r.org).Query(ctx, fluxQuery)
qa := r.client.QueryApi(r.org)
return qa.Query(ctx, fluxQuery)
}
// RunnerFromDataSource creates a runner from the datasource model (the datasource instance's configuration).
......@@ -71,7 +73,7 @@ func RunnerFromDataSource(dsInfo *models.DataSource) (*Runner, error) {
url := dsInfo.Url
if url == "" {
return nil, fmt.Errorf("missing url from datasource configuration")
return nil, fmt.Errorf("missing URL from datasource configuration")
}
token, found := dsInfo.SecureJsonData.DecryptedValue("token")
if !found {
......
......@@ -54,9 +54,8 @@ func GetQueryModelTSDB(query *tsdb.Query, timeRange *tsdb.TimeRange, dsInfo *mod
return nil, fmt.Errorf("failed to re-encode the flux query into JSON: %w", err)
}
err = json.Unmarshal(queryBytes, &model)
if err != nil {
return nil, fmt.Errorf("error reading query: %s", err.Error())
if err := json.Unmarshal(queryBytes, &model); err != nil {
return nil, fmt.Errorf("error reading query: %w", err)
}
if model.Options.DefaultBucket == "" {
model.Options.DefaultBucket = dsInfo.JsonData.Get("defaultBucket").MustString("")
......
......@@ -15,7 +15,6 @@ import (
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/tsdb"
"github.com/grafana/grafana/pkg/tsdb/influxdb/flux"
"golang.org/x/net/context/ctxhttp"
)
type InfluxDBExecutor struct {
......@@ -51,6 +50,8 @@ func (e *InfluxDBExecutor) Query(ctx context.Context, dsInfo *models.DataSource,
return flux.Query(ctx, dsInfo, tsdbQuery)
}
glog.Debug("Making a non-Flux type query")
// NOTE: the following path is currently only called from alerting queries
// In dashboards, the request runs through proxy and are managed in the frontend
......@@ -68,7 +69,7 @@ func (e *InfluxDBExecutor) Query(ctx context.Context, dsInfo *models.DataSource,
glog.Debug("Influxdb query", "raw query", rawQuery)
}
req, err := e.createRequest(dsInfo, rawQuery)
req, err := e.createRequest(ctx, dsInfo, rawQuery)
if err != nil {
return nil, err
}
......@@ -78,7 +79,7 @@ func (e *InfluxDBExecutor) Query(ctx context.Context, dsInfo *models.DataSource,
return nil, err
}
resp, err := ctxhttp.Do(ctx, httpClient, req)
resp, err := httpClient.Do(req)
if err != nil {
return nil, err
}
......@@ -91,12 +92,9 @@ func (e *InfluxDBExecutor) Query(ctx context.Context, dsInfo *models.DataSource,
var response Response
dec := json.NewDecoder(resp.Body)
dec.UseNumber()
err = dec.Decode(&response)
if err != nil {
if err := dec.Decode(&response); err != nil {
return nil, err
}
if response.Err != nil {
return nil, response.Err
}
......@@ -109,43 +107,46 @@ func (e *InfluxDBExecutor) Query(ctx context.Context, dsInfo *models.DataSource,
}
func (e *InfluxDBExecutor) getQuery(dsInfo *models.DataSource, queries []*tsdb.Query, context *tsdb.TsdbQuery) (*Query, error) {
if len(queries) == 0 {
return nil, fmt.Errorf("query request contains no queries")
}
// The model supports multiple queries, but right now this is only used from
// alerting so we only needed to support batch executing 1 query at a time.
if len(queries) > 0 {
query, err := e.QueryParser.Parse(queries[0].Model, dsInfo)
if err != nil {
return nil, err
}
return query, nil
}
return nil, fmt.Errorf("query request contains no queries")
}
func (e *InfluxDBExecutor) createRequest(dsInfo *models.DataSource, query string) (*http.Request, error) {
func (e *InfluxDBExecutor) createRequest(ctx context.Context, dsInfo *models.DataSource, query string) (*http.Request, error) {
u, err := url.Parse(dsInfo.Url)
if err != nil {
return nil, err
}
u.Path = path.Join(u.Path, "query")
httpMode := dsInfo.JsonData.Get("httpMode").MustString("GET")
req, err := func() (*http.Request, error) {
var req *http.Request
switch httpMode {
case "GET":
return http.NewRequest(http.MethodGet, u.String(), nil)
req, err = http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
if err != nil {
return nil, err
}
case "POST":
bodyValues := url.Values{}
bodyValues.Add("q", query)
body := bodyValues.Encode()
return http.NewRequest(http.MethodPost, u.String(), strings.NewReader(body))
default:
return nil, ErrInvalidHttpMode
}
}()
req, err = http.NewRequestWithContext(ctx, http.MethodPost, u.String(), strings.NewReader(body))
if err != nil {
return nil, err
}
default:
return nil, ErrInvalidHttpMode
}
req.Header.Set("User-Agent", "Grafana")
......
package influxdb
import (
"context"
"io/ioutil"
"net/url"
"testing"
......@@ -23,7 +24,8 @@ func TestInfluxDB(t *testing.T) {
ResponseParser: &ResponseParser{},
}
Convey("createRequest with GET httpMode", func() {
req, _ := e.createRequest(datasource, query)
req, err := e.createRequest(context.Background(), datasource, query)
So(err, ShouldBeNil)
Convey("as default", func() {
So(req.Method, ShouldEqual, "GET")
......@@ -41,7 +43,8 @@ func TestInfluxDB(t *testing.T) {
Convey("createRequest with POST httpMode", func() {
datasource.JsonData.Set("httpMode", "POST")
req, _ := e.createRequest(datasource, query)
req, err := e.createRequest(context.Background(), datasource, query)
So(err, ShouldBeNil)
Convey("method should be POST", func() {
So(req.Method, ShouldEqual, "POST")
......@@ -63,7 +66,7 @@ func TestInfluxDB(t *testing.T) {
Convey("createRequest with PUT httpMode", func() {
datasource.JsonData.Set("httpMode", "PUT")
_, err := e.createRequest(datasource, query)
_, err := e.createRequest(context.Background(), datasource, query)
Convey("should miserably fail", func() {
So(err, ShouldEqual, ErrInvalidHttpMode)
......
......@@ -151,14 +151,13 @@ func (*InfluxdbQueryParser) parseQueryPart(model *simplejson.Json) (*QueryPart,
func (qp *InfluxdbQueryParser) parseGroupBy(model *simplejson.Json) ([]*QueryPart, error) {
var result []*QueryPart
for _, groupObj := range model.Get("groupBy").MustArray() {
groupJson := simplejson.NewFromAny(groupObj)
queryPart, err := qp.parseQueryPart(groupJson)
if err != nil {
return nil, err
}
result = append(result, queryPart)
}
......
......@@ -16,7 +16,6 @@ var (
func (query *Query) Build(queryContext *tsdb.TsdbQuery) (string, error) {
var res string
if query.UseRawQuery && query.RawQuery != "" {
res = query.RawQuery
} else {
......
......@@ -134,9 +134,8 @@ func (r QueryDefinition) Render(query *Query, queryContext *tsdb.TsdbQuery, part
func NewQueryPart(typ string, params []string) (*QueryPart, error) {
def, exist := renders[typ]
if !exist {
return nil, fmt.Errorf("Missing query definition for %s", typ)
return nil, fmt.Errorf("missing query definition for %q", typ)
}
return &QueryPart{
......
......@@ -36,7 +36,6 @@ func (rp *ResponseParser) Parse(response *Response, query *Query) *tsdb.QueryRes
func (rp *ResponseParser) transformRows(rows []Row, queryResult *tsdb.QueryResult, query *Query) tsdb.TimeSeriesSlice {
var result tsdb.TimeSeriesSlice
for _, row := range rows {
for columnIndex, column := range row.Columns {
if column == "time" {
......@@ -104,7 +103,6 @@ func (rp *ResponseParser) formatSeriesName(row Row, column string, query *Query)
func (rp *ResponseParser) buildSeriesNameFromQuery(row Row, column string) string {
var tags []string
for k, v := range row.Tags {
tags = append(tags, fmt.Sprintf("%s: %s", k, v))
}
......@@ -118,9 +116,12 @@ func (rp *ResponseParser) buildSeriesNameFromQuery(row Row, column string) strin
}
func (rp *ResponseParser) parseTimepoint(valuePair []interface{}, valuePosition int) (tsdb.TimePoint, error) {
var value null.Float = rp.parseValue(valuePair[valuePosition])
value := rp.parseValue(valuePair[valuePosition])
timestampNumber, _ := valuePair[0].(json.Number)
timestampNumber, ok := valuePair[0].(json.Number)
if !ok {
return tsdb.TimePoint{}, fmt.Errorf("valuePair[0] has invalid type: %#v", valuePair[0])
}
timestamp, err := timestampNumber.Float64()
if err != nil {
return tsdb.TimePoint{}, err
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment