Commit 5f1f820b by Ryan McKinley Committed by GitHub

Influx: Support flux in the influx datasource (#25308)

* add flux 
* add token to datasource config editor
* add backend for flux
* make the interpolated query available in query inspector
* go mod tidy
* Chore: fixes a couple of strict null errors in influxdb plugin

Co-authored-by: kyle <kyle@grafana.com>
Co-authored-by: Lukas Siatka <lukasz.siatka@grafana.com>
parent c7aac1fd
......@@ -25,8 +25,8 @@ require (
github.com/go-sql-driver/mysql v1.5.0
github.com/go-stack/stack v1.8.0
github.com/gobwas/glob v0.2.3
github.com/golang/protobuf v1.3.4
github.com/google/go-cmp v0.3.1
github.com/golang/protobuf v1.4.0
github.com/google/go-cmp v0.4.0
github.com/gorilla/websocket v1.4.1
github.com/gosimple/slug v1.4.2
github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4
......@@ -35,6 +35,7 @@ require (
github.com/hashicorp/go-plugin v1.2.2
github.com/hashicorp/go-version v1.1.0
github.com/inconshreveable/log15 v0.0.0-20180818164646-67afb5ed74ec
github.com/influxdata/influxdb-client-go v1.1.1-0.20200511153144-e63a28ffeba7
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af
github.com/jung-kurt/gofpdf v1.10.1
github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88 // indirect
......@@ -70,13 +71,11 @@ require (
github.com/yudai/pp v2.0.1+incompatible // indirect
go.uber.org/atomic v1.5.1 // indirect
golang.org/x/crypto v0.0.0-20200406173513-056763e48d71
golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f // indirect
golang.org/x/net v0.0.0-20200202094626-16171245cfb2
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
golang.org/x/tools v0.0.0-20191213221258-04c2e8eff935 // indirect
golang.org/x/net v0.0.0-20200501053045-e0ff5e5a1de5
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543
google.golang.org/grpc v1.27.1
google.golang.org/grpc v1.29.1
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
gopkg.in/asn1-ber.v1 v1.0.0-20181015200546-f715ec2f112d // indirect
gopkg.in/ini.v1 v1.46.0
......@@ -85,7 +84,7 @@ require (
gopkg.in/mail.v2 v2.3.1
gopkg.in/redis.v5 v5.2.9
gopkg.in/square/go-jose.v2 v2.4.1
gopkg.in/yaml.v2 v2.2.5
gopkg.in/yaml.v2 v2.2.8
xorm.io/core v0.7.3
xorm.io/xorm v0.8.1
)
package flux
import (
"fmt"
"github.com/grafana/grafana-plugin-sdk-go/data"
influxdb2 "github.com/influxdata/influxdb-client-go"
)
// Copied from: (Apache 2 license)
// https://github.com/influxdata/influxdb-client-go/blob/master/query.go#L30
const (
stringDatatype = "string"
doubleDatatype = "double"
boolDatatype = "bool"
longDatatype = "long"
uLongDatatype = "unsignedLong"
durationDatatype = "duration"
base64BinaryDataType = "base64Binary"
timeDatatypeRFC = "dateTime:RFC3339"
timeDatatypeRFCNano = "dateTime:RFC3339Nano"
)
type columnInfo struct {
name string
converter *data.FieldConverter
}
// This is an interface to help testing
type FrameBuilder struct {
tableId int64
active *data.Frame
frames []*data.Frame
value *data.FieldConverter
columns []columnInfo
labels []string
maxPoints int // max points in a series
maxSeries int // max number of series
totalSeries int
isTimeSeries bool
}
func isTag(schk string) bool {
return (schk != "result" && schk != "table" && schk[0] != '_')
}
func getConverter(t string) (*data.FieldConverter, error) {
switch t {
case stringDatatype:
return &AnyToOptionalString, nil
case timeDatatypeRFC:
return &Int64ToOptionalInt64, nil
case timeDatatypeRFCNano:
return &Int64ToOptionalInt64, nil
case durationDatatype:
return &Int64ToOptionalInt64, nil
case doubleDatatype:
return &Float64ToOptionalFloat64, nil
case boolDatatype:
return &BoolToOptionalBool, nil
case longDatatype:
return &Int64ToOptionalInt64, nil
case uLongDatatype:
return &UInt64ToOptionalUInt64, nil
case base64BinaryDataType:
return &AnyToOptionalString, nil
}
return nil, fmt.Errorf("No matching converter found for [%v]", t)
}
// Init initializes the frame to be returned
// fields points at entries in the frame, and provides easier access
// names indexes the columns encountered
func (fb *FrameBuilder) Init(metadata *influxdb2.FluxTableMetadata) error {
columns := metadata.Columns()
fb.frames = make([]*data.Frame, 0)
fb.tableId = -1
fb.value = nil
fb.columns = make([]columnInfo, 0)
fb.isTimeSeries = false
for _, col := range columns {
switch {
case col.Name() == "_value":
if fb.value != nil {
return fmt.Errorf("multiple values found")
}
converter, err := getConverter(col.DataType())
if err != nil {
return err
}
fb.value = converter
case col.Name() == "_measurement":
fb.isTimeSeries = true
case isTag(col.Name()):
fb.labels = append(fb.labels, col.Name())
}
}
if !fb.isTimeSeries {
fb.labels = make([]string, 0)
for _, col := range columns {
converter, err := getConverter(col.DataType())
if err != nil {
return err
}
fb.columns = append(fb.columns, columnInfo{
name: col.Name(),
converter: converter,
})
}
}
return nil
}
// Append appends a single entry from an influxdb2 record to a data frame
// Values are appended to _value
// Tags are appended as labels
// _measurement holds the dataframe name
// _field holds the field name.
func (fb *FrameBuilder) Append(record *influxdb2.FluxRecord) error {
table, ok := record.ValueByKey("table").(int64)
if ok && table != fb.tableId {
fb.totalSeries++
if fb.totalSeries > fb.maxSeries {
return fmt.Errorf("reached max series limit (%d)", fb.maxSeries)
}
if fb.isTimeSeries {
// Series Data
labels := make(map[string]string)
for _, name := range fb.labels {
labels[name] = record.ValueByKey(name).(string)
}
fb.active = data.NewFrame(
record.Measurement(),
data.NewFieldFromFieldType(data.FieldTypeTime, 0),
data.NewFieldFromFieldType(fb.value.OutputFieldType, 0),
)
fb.active.Fields[0].Name = "Time"
fb.active.Fields[1].Name = record.Field()
fb.active.Fields[1].Labels = labels
} else {
fields := make([]*data.Field, len(fb.columns))
for idx, col := range fb.columns {
fields[idx] = data.NewFieldFromFieldType(col.converter.OutputFieldType, 0)
fields[idx].Name = col.name
}
fb.active = data.NewFrame("", fields...)
}
fb.frames = append(fb.frames, fb.active)
fb.tableId = table
}
if fb.isTimeSeries {
val, err := fb.value.Converter(record.Value())
if err != nil {
return err
}
fb.active.Fields[0].Append(record.Time())
fb.active.Fields[1].Append(val)
} else {
// Table view
for idx, col := range fb.columns {
val, err := col.converter.Converter(record.ValueByKey(col.name))
if err != nil {
return err
}
fb.active.Fields[idx].Append(val)
}
}
if fb.active.Fields[0].Len() > fb.maxPoints {
return fmt.Errorf("returned too many points in a series: %d", fb.maxPoints)
}
return nil
}
package flux
import (
"regexp"
"testing"
)
var isField = regexp.MustCompile(`^_(time|value|measurement|field|start|stop)$`)
func TestColumnIdentification(t *testing.T) {
t.Run("Test Field Identification", func(t *testing.T) {
fieldNames := []string{"_time", "_value", "_measurement", "_field", "_start", "_stop"}
for _, item := range fieldNames {
if !isField.MatchString(item) {
t.Fatal("Field", item, "Expected field, but got false")
}
}
})
t.Run("Test Not Field Identification", func(t *testing.T) {
fieldNames := []string{"_header", "_cpu", "_hello", "_doctor"}
for _, item := range fieldNames {
if isField.MatchString(item) {
t.Fatal("Field", item, "Expected NOT a field, but got true")
}
}
})
t.Run("Test Tag Identification", func(t *testing.T) {
tagNames := []string{"header", "value", "tag"}
for _, item := range tagNames {
if !isTag(item) {
t.Fatal("Tag", item, "Expected tag, but got false")
}
}
})
t.Run("Test Special Case Tag Identification", func(t *testing.T) {
notTagNames := []string{"table", "result"}
for _, item := range notTagNames {
if isTag(item) {
t.Fatal("Special tag", item, "Expected NOT a tag, but got true")
}
}
})
}
package flux
import (
"fmt"
"strconv"
"time"
"github.com/grafana/grafana-plugin-sdk-go/data"
)
// Int64NOOP .....
var Int64NOOP = data.FieldConverter{
OutputFieldType: data.FieldTypeInt64,
}
// BoolNOOP .....
var BoolNOOP = data.FieldConverter{
OutputFieldType: data.FieldTypeBool,
}
// Float64NOOP .....
var Float64NOOP = data.FieldConverter{
OutputFieldType: data.FieldTypeFloat64,
}
// StringNOOP value is already in the proper format
var StringNOOP = data.FieldConverter{
OutputFieldType: data.FieldTypeString,
}
// AnyToOptionalString any value as a string
var AnyToOptionalString = data.FieldConverter{
OutputFieldType: data.FieldTypeNullableString,
Converter: func(v interface{}) (interface{}, error) {
if v == nil {
return nil, nil
}
str := fmt.Sprintf("%+v", v) // the +v adds field names
return &str, nil
},
}
// Float64ToOptionalFloat64 optional float value
var Float64ToOptionalFloat64 = data.FieldConverter{
OutputFieldType: data.FieldTypeNullableFloat64,
Converter: func(v interface{}) (interface{}, error) {
if v == nil {
return nil, nil
}
val, ok := v.(float64)
if !ok { // or return some default value instead of erroring
return nil, fmt.Errorf("[float] expected float64 input but got type %T", v)
}
return &val, nil
},
}
// Int64ToOptionalInt64 optional int value
var Int64ToOptionalInt64 = data.FieldConverter{
OutputFieldType: data.FieldTypeNullableInt64,
Converter: func(v interface{}) (interface{}, error) {
if v == nil {
return nil, nil
}
val, ok := v.(int64)
if !ok { // or return some default value instead of erroring
return nil, fmt.Errorf("[int] expected int64 input but got type %T", v)
}
return &val, nil
},
}
// UInt64ToOptionalUInt64 optional int value
var UInt64ToOptionalUInt64 = data.FieldConverter{
OutputFieldType: data.FieldTypeNullableUint64,
Converter: func(v interface{}) (interface{}, error) {
if v == nil {
return nil, nil
}
val, ok := v.(uint64)
if !ok { // or return some default value instead of erroring
return nil, fmt.Errorf("[uint] expected uint64 input but got type %T", v)
}
return &val, nil
},
}
// BoolToOptionalBool optional int value
var BoolToOptionalBool = data.FieldConverter{
OutputFieldType: data.FieldTypeNullableBool,
Converter: func(v interface{}) (interface{}, error) {
if v == nil {
return nil, nil
}
val, ok := v.(bool)
if !ok { // or return some default value instead of erroring
return nil, fmt.Errorf("[bool] expected bool input but got type %T", v)
}
return &val, nil
},
}
// RFC3339StringToNullableTime .....
func RFC3339StringToNullableTime(s string) (*time.Time, error) {
if s == "" {
return nil, nil
}
rv, err := time.Parse(time.RFC3339, s)
if err != nil {
return nil, err
}
u := rv.UTC()
return &u, nil
}
// StringToOptionalFloat64 string to float
var StringToOptionalFloat64 = data.FieldConverter{
OutputFieldType: data.FieldTypeNullableFloat64,
Converter: func(v interface{}) (interface{}, error) {
if v == nil {
return nil, nil
}
val, ok := v.(string)
if !ok { // or return some default value instead of erroring
return nil, fmt.Errorf("[floatz] expected string input but got type %T", v)
}
fV, err := strconv.ParseFloat(val, 64)
return &fV, err
},
}
// Float64EpochSecondsToTime numeric seconds to time
var Float64EpochSecondsToTime = data.FieldConverter{
OutputFieldType: data.FieldTypeTime,
Converter: func(v interface{}) (interface{}, error) {
fV, ok := v.(float64)
if !ok { // or return some default value instead of erroring
return nil, fmt.Errorf("[seconds] expected float64 input but got type %T", v)
}
return time.Unix(int64(fV), 0).UTC(), nil
},
}
// Float64EpochMillisToTime convert to time
var Float64EpochMillisToTime = data.FieldConverter{
OutputFieldType: data.FieldTypeTime,
Converter: func(v interface{}) (interface{}, error) {
fV, ok := v.(float64)
if !ok { // or return some default value instead of erroring
return nil, fmt.Errorf("[ms] expected float64 input but got type %T", v)
}
return time.Unix(0, int64(fV)*int64(time.Millisecond)).UTC(), nil
},
}
// Boolean ...
var Boolean = data.FieldConverter{
OutputFieldType: data.FieldTypeBool,
Converter: func(v interface{}) (interface{}, error) {
fV, ok := v.(bool)
if !ok { // or return some default value instead of erroring
return nil, fmt.Errorf("[ms] expected bool input but got type %T", v)
}
return fV, nil
},
}
package flux
import (
"context"
"fmt"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
influxdb2 "github.com/influxdata/influxdb-client-go"
)
// ExecuteQuery runs a flux query using the QueryModel to interpolate the query and the runner to execute it.
// maxSeries somehow limits the response.
func ExecuteQuery(ctx context.Context, query QueryModel, runner queryRunner, maxSeries int) (dr backend.DataResponse) {
dr = backend.DataResponse{}
flux, err := Interpolate(query)
if err != nil {
dr.Error = err
return
}
glog.Debug("Flux", "interpolated query", flux)
tables, err := runner.runQuery(ctx, flux)
if err != nil {
dr.Error = err
return
}
dr = readDataFrames(tables, int(float64(query.MaxDataPoints)*1.5), maxSeries)
for _, frame := range dr.Frames {
if frame.Meta == nil {
frame.Meta = &data.FrameMeta{}
}
frame.Meta.ExecutedQueryString = flux
}
return dr
}
func readDataFrames(result *influxdb2.QueryTableResult, maxPoints int, maxSeries int) (dr backend.DataResponse) {
dr = backend.DataResponse{}
builder := &FrameBuilder{
maxPoints: maxPoints,
maxSeries: maxSeries,
}
for result.Next() {
// Observe when there is new grouping key producing new table
if result.TableChanged() {
if builder.frames != nil {
for _, frame := range builder.frames {
dr.Frames = append(dr.Frames, frame)
}
}
err := builder.Init(result.TableMetadata())
if err != nil {
dr.Error = err
return
}
}
if builder.frames == nil {
dr.Error = fmt.Errorf("Invalid state")
return dr
}
err := builder.Append(result.Record())
if err != nil {
dr.Error = err
break
}
}
// Add the inprogress record
if builder.frames != nil {
for _, frame := range builder.frames {
dr.Frames = append(dr.Frames, frame)
}
}
// Attach any errors (may be null)
dr.Error = result.Err()
return dr
}
package flux
import (
"context"
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
influxdb2 "github.com/influxdata/influxdb-client-go"
)
//--------------------------------------------------------------
// TestData -- reads result from saved files
//--------------------------------------------------------------
// MockRunner reads local file path for testdata.
type MockRunner struct {
testDataPath string
}
func (r *MockRunner) runQuery(ctx context.Context, q string) (*influxdb2.QueryTableResult, error) {
bytes, err := ioutil.ReadFile("./testdata/" + r.testDataPath)
if err != nil {
return nil, err
}
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(100 * time.Millisecond)
if r.Method == http.MethodPost {
w.WriteHeader(http.StatusOK)
_, _ = w.Write(bytes)
} else {
w.WriteHeader(http.StatusNotFound)
}
}))
defer server.Close()
client := influxdb2.NewClient(server.URL, "a")
return client.QueryApi("x").Query(ctx, q)
}
func TestExecuteSimple(t *testing.T) {
ctx := context.Background()
t.Run("Simple Test", func(t *testing.T) {
runner := &MockRunner{
testDataPath: "simple.csv",
}
dr := ExecuteQuery(ctx, QueryModel{MaxDataPoints: 100}, runner, 50)
if dr.Error != nil {
t.Fatal(dr.Error)
}
if len(dr.Frames) != 1 {
t.Fatalf("Expected 1 frame, received [%d] frames", len(dr.Frames))
}
if !strings.Contains(dr.Frames[0].Name, "test") {
t.Fatalf("Frame must match _measurement column. Expected [%s] Got [%s]", "test", dr.Frames[0].Name)
}
if len(dr.Frames[0].Fields[1].Labels) != 2 {
t.Fatalf("Error parsing labels. Expected [%d] Got [%d]", 2, len(dr.Frames[0].Fields[1].Labels))
}
if dr.Frames[0].Fields[0].Name != "Time" {
t.Fatalf("Error parsing fields. Field 1 should always be time. Got name [%s]", dr.Frames[0].Fields[0].Name)
}
st, _ := dr.Frames[0].StringTable(-1, -1)
fmt.Println(st)
fmt.Println("----------------------")
})
}
func TestExecuteMultiple(t *testing.T) {
ctx := context.Background()
t.Run("Multiple Test", func(t *testing.T) {
runner := &MockRunner{
testDataPath: "multiple.csv",
}
dr := ExecuteQuery(ctx, QueryModel{MaxDataPoints: 100}, runner, 50)
if dr.Error != nil {
t.Fatal(dr.Error)
}
if len(dr.Frames) != 4 {
t.Fatalf("Expected 4 frames, received [%d] frames", len(dr.Frames))
}
if !strings.Contains(dr.Frames[0].Name, "test") {
t.Fatalf("Frame must include _measurement column. Expected [%s] Got [%s]", "test", dr.Frames[0].Name)
}
if len(dr.Frames[0].Fields[1].Labels) != 2 {
t.Fatalf("Error parsing labels. Expected [%d] Got [%d]", 2, len(dr.Frames[0].Fields[1].Labels))
}
if dr.Frames[0].Fields[0].Name != "Time" {
t.Fatalf("Error parsing fields. Field 1 should always be time. Got name [%s]", dr.Frames[0].Fields[0].Name)
}
st, _ := dr.Frames[0].StringTable(-1, -1)
fmt.Println(st)
fmt.Println("----------------------")
})
}
func TestExecuteGrouping(t *testing.T) {
ctx := context.Background()
t.Run("Grouping Test", func(t *testing.T) {
runner := &MockRunner{
testDataPath: "grouping.csv",
}
dr := ExecuteQuery(ctx, QueryModel{MaxDataPoints: 100}, runner, 50)
if dr.Error != nil {
t.Fatal(dr.Error)
}
if len(dr.Frames) != 3 {
t.Fatalf("Expected 3 frames, received [%d] frames", len(dr.Frames))
}
if !strings.Contains(dr.Frames[0].Name, "system") {
t.Fatalf("Frame must match _measurement column. Expected [%s] Got [%s]", "test", dr.Frames[0].Name)
}
if len(dr.Frames[0].Fields[1].Labels) != 1 {
t.Fatalf("Error parsing labels. Expected [%d] Got [%d]", 1, len(dr.Frames[0].Fields[1].Labels))
}
if dr.Frames[0].Fields[0].Name != "Time" {
t.Fatalf("Error parsing fields. Field 1 should always be time. Got name [%s]", dr.Frames[0].Fields[0].Name)
}
st, _ := dr.Frames[0].StringTable(-1, -1)
fmt.Println(st)
fmt.Println("----------------------")
})
}
func TestBuckets(t *testing.T) {
ctx := context.Background()
t.Run("Buckes", func(t *testing.T) {
runner := &MockRunner{
testDataPath: "buckets.csv",
}
dr := ExecuteQuery(ctx, QueryModel{MaxDataPoints: 100}, runner, 50)
if dr.Error != nil {
t.Fatal(dr.Error)
}
st, _ := dr.Frames[0].StringTable(-1, -1)
fmt.Println(st)
fmt.Println("----------------------")
})
}
package flux
import (
"context"
"fmt"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/tsdb"
influxdb2 "github.com/influxdata/influxdb-client-go"
)
var (
glog log.Logger
)
func init() {
glog = log.New("tsdb.influx_flux")
}
// Query builds flux queries, executes them, and returns the results.
func Query(ctx context.Context, dsInfo *models.DataSource, tsdbQuery *tsdb.TsdbQuery) (*tsdb.Response, error) {
tRes := &tsdb.Response{
Results: make(map[string]*tsdb.QueryResult),
}
runner, err := RunnerFromDataSource(dsInfo)
if err != nil {
return nil, err
}
for _, query := range tsdbQuery.Queries {
qm, err := GetQueryModelTSDB(query, tsdbQuery.TimeRange, dsInfo)
if err != nil {
tRes.Results[query.RefId] = &tsdb.QueryResult{Error: err}
continue
}
res := ExecuteQuery(context.Background(), *qm, runner, 10)
tRes.Results[query.RefId] = backendDataResponseToTSDBResponse(&res, query.RefId)
}
return tRes, nil
}
// Runner is an influxdb2 Client with an attached org property and is used
// for running flux queries.
type Runner struct {
client influxdb2.Client
org string
}
// This is an interface to help testing
type queryRunner interface {
runQuery(ctx context.Context, q string) (*influxdb2.QueryTableResult, error)
}
// runQuery executes fluxQuery against the Runner's organization and returns an flux typed result.
func (r *Runner) runQuery(ctx context.Context, fluxQuery string) (*influxdb2.QueryTableResult, error) {
return r.client.QueryApi(r.org).Query(ctx, fluxQuery)
}
// RunnerFromDataSource creates a runner from the datasource model (the datasource instance's configuration).
func RunnerFromDataSource(dsInfo *models.DataSource) (*Runner, error) {
org := dsInfo.JsonData.Get("organization").MustString("")
if org == "" {
return nil, fmt.Errorf("missing organization in datasource configuration")
}
url := dsInfo.Url
if url == "" {
return nil, fmt.Errorf("missing url from datasource configuration")
}
token, found := dsInfo.SecureJsonData.DecryptedValue("token")
if !found {
return nil, fmt.Errorf("token is missing from datasource configuration and is needed to use Flux")
}
return &Runner{
client: influxdb2.NewClient(url, token),
org: org,
}, nil
}
// backendDataResponseToTSDBResponse takes the SDK's style response and changes it into a
// tsdb.QueryResult. This is a wrapper so less of existing code needs to be changed. This should
// be able to be removed in the near future https://github.com/grafana/grafana/pull/25472.
func backendDataResponseToTSDBResponse(dr *backend.DataResponse, refID string) *tsdb.QueryResult {
qr := &tsdb.QueryResult{RefId: refID}
if dr.Error != nil {
qr.Error = dr.Error
return qr
}
if dr.Frames != nil {
qr.Dataframes = tsdb.NewDecodedDataFrames(dr.Frames)
}
return qr
}
package flux
import (
"fmt"
"regexp"
"strings"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
)
const variableFilter = `(?m)([a-zA-Z]+)\.([a-zA-Z]+)`
// Interpolate processes macros
func Interpolate(query QueryModel) (string, error) {
flux := query.RawQuery
variableFilterExp, err := regexp.Compile(variableFilter)
matches := variableFilterExp.FindAllStringSubmatch(flux, -1)
if matches != nil {
timeRange := query.TimeRange
from := timeRange.From.UTC().Format(time.RFC3339)
to := timeRange.To.UTC().Format(time.RFC3339)
for _, match := range matches {
switch match[2] {
case "timeRangeStart":
flux = strings.ReplaceAll(flux, match[0], from)
case "timeRangeStop":
flux = strings.ReplaceAll(flux, match[0], to)
case "windowPeriod":
flux = strings.ReplaceAll(flux, match[0], query.Interval.String())
case "bucket":
flux = strings.ReplaceAll(flux, match[0], "\""+query.Options.Bucket+"\"")
case "defaultBucket":
flux = strings.ReplaceAll(flux, match[0], "\""+query.Options.DefaultBucket+"\"")
case "organization":
flux = strings.ReplaceAll(flux, match[0], "\""+query.Options.Organization+"\"")
}
}
}
backend.Logger.Info(fmt.Sprintf("%s => %v", flux, query.Options))
return flux, err
}
package flux
import (
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/grafana/grafana-plugin-sdk-go/backend"
)
func TestInterpolate(t *testing.T) {
// Unix sec: 1500376552
// Unix ms: 1500376552001
timeRange := backend.TimeRange{
From: time.Unix(0, 0),
To: time.Unix(0, 0),
}
options := QueryOptions{
Organization: "grafana1",
Bucket: "grafana2",
DefaultBucket: "grafana3",
}
tests := []struct {
name string
before string
after string
}{
{
name: "interpolate flux variables",
before: `v.timeRangeStart, something.timeRangeStop, XYZ.bucket, uuUUu.defaultBucket, aBcDefG.organization, window.windowPeriod, a91{}.bucket`,
after: `1970-01-01T00:00:00Z, 1970-01-01T00:00:00Z, "grafana2", "grafana3", "grafana1", 1s, a91{}.bucket`,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
query := QueryModel{
RawQuery: tt.before,
Options: options,
TimeRange: timeRange,
MaxDataPoints: 1,
Interval: 1000 * 1000 * 1000,
}
interpolatedQuery, err := Interpolate(query)
if err != nil {
t.Fatal(err)
}
if diff := cmp.Diff(tt.after, interpolatedQuery); diff != "" {
t.Fatalf("Result mismatch (-want +got):\n%s", diff)
}
})
}
}
package flux
import (
"encoding/json"
"fmt"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/tsdb"
)
// QueryOptions represents datasource configuration options
type QueryOptions struct {
Bucket string `json:"bucket"`
DefaultBucket string `json:"defaultBucket"`
Organization string `json:"organization"`
}
// QueryModel represents a spreadsheet query.
type QueryModel struct {
RawQuery string `json:"query"`
Options QueryOptions `json:"options"`
// Not from JSON
TimeRange backend.TimeRange `json:"-"`
MaxDataPoints int64 `json:"-"`
Interval time.Duration `json:"-"`
}
// The following is commented out but kept as it should be useful when
// restoring this code to be closer to the SDK's models.
// func GetQueryModel(query backend.DataQuery) (*QueryModel, error) {
// model := &QueryModel{}
// err := json.Unmarshal(query.JSON, &model)
// if err != nil {
// return nil, fmt.Errorf("error reading query: %s", err.Error())
// }
// // Copy directly from the well typed query
// model.TimeRange = query.TimeRange
// model.MaxDataPoints = query.MaxDataPoints
// model.Interval = query.Interval
// return model, nil
// }
// GetQueryModelTSDB builds a QueryModel from tsdb.Query information and datasource configuration (dsInfo).
func GetQueryModelTSDB(query *tsdb.Query, timeRange *tsdb.TimeRange, dsInfo *models.DataSource) (*QueryModel, error) {
model := &QueryModel{}
queryBytes, err := query.Model.Encode()
if err != nil {
return nil, fmt.Errorf("failed to re-encode the flux query into JSON: %w", err)
}
err = json.Unmarshal(queryBytes, &model)
if err != nil {
return nil, fmt.Errorf("error reading query: %s", err.Error())
}
if model.Options.DefaultBucket == "" {
model.Options.DefaultBucket = dsInfo.JsonData.Get("defaultBucket").MustString("")
}
if model.Options.Bucket == "" {
model.Options.Bucket = model.Options.DefaultBucket
}
if model.Options.Organization == "" {
model.Options.Organization = dsInfo.JsonData.Get("organization").MustString("")
}
startTime, err := timeRange.ParseFrom()
if err != nil {
return nil, err
}
endTime, err := timeRange.ParseTo()
if err != nil {
return nil, err
}
// Copy directly from the well typed query
model.TimeRange = backend.TimeRange{
From: startTime,
To: endTime,
}
model.MaxDataPoints = query.MaxDataPoints
model.Interval = time.Millisecond * time.Duration(query.IntervalMs)
return model, nil
}
#datatype,string,long,string,string,string,string,long
#group,false,false,false,false,true,false,false
#default,_result,,,,,,
,result,table,name,id,organizationID,retentionPolicy,retentionPeriod
,,0,grafana,059b46a59abab001,059b46a59abab000,,604800000000000
,,0,_tasks,059b46a59abab002,059b46a59abab000,,259200000000000
,,0,_monitoring,059b46a59abab003,059b46a59abab000,,604800000000000
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,string,string,string,double,dateTime:RFC3339
#group,false,false,true,true,true,true,true,false,false
#default,mean,,,,,,,,
,result,table,_start,_stop,_field,_measurement,host,_value,_time
,,0,2020-05-05T18:38:47.207881833Z,2020-05-05T19:38:47.207881833Z,load1,system,hostname,,2020-05-05T18:38:50Z
,,0,2020-05-05T18:38:47.207881833Z,2020-05-05T19:38:47.207881833Z,load1,system,hostname,3.56,2020-05-05T18:39:00Z
,,0,2020-05-05T18:38:47.207881833Z,2020-05-05T19:38:47.207881833Z,load1,system,hostname,,2020-05-05T19:38:47.207881833Z
,,1,2020-05-05T18:38:47.207881833Z,2020-05-05T19:38:47.207881833Z,load15,system,hostname,,2020-05-05T18:38:50Z
,,1,2020-05-05T18:38:47.207881833Z,2020-05-05T19:38:47.207881833Z,load15,system,hostname,2.51,2020-05-05T18:39:00Z
,,1,2020-05-05T18:38:47.207881833Z,2020-05-05T19:38:47.207881833Z,load15,system,hostname,1.74,2020-05-05T19:38:40Z
,,1,2020-05-05T18:38:47.207881833Z,2020-05-05T19:38:47.207881833Z,load15,system,hostname,,2020-05-05T19:38:47.207881833Z
,,2,2020-05-05T18:38:47.207881833Z,2020-05-05T19:38:47.207881833Z,load5,system,hostname,,2020-05-05T18:38:50Z
,,2,2020-05-05T18:38:47.207881833Z,2020-05-05T19:38:47.207881833Z,load5,system,hostname,3.14,2020-05-05T18:39:00Z
,,2,2020-05-05T18:38:47.207881833Z,2020-05-05T19:38:47.207881833Z,load5,system,hostname,3.04,2020-05-05T18:39:10Z
,,2,2020-05-05T18:38:47.207881833Z,2020-05-05T19:38:47.207881833Z,load5,system,hostname,1.8,2020-05-05T19:37:50Z
,,2,2020-05-05T18:38:47.207881833Z,2020-05-05T19:38:47.207881833Z,load5,system,hostname,1.76,2020-05-05T19:38:00Z
,,2,2020-05-05T18:38:47.207881833Z,2020-05-05T19:38:47.207881833Z,load5,system,hostname,1.75,2020-05-05T19:38:10Z
,,2,2020-05-05T18:38:47.207881833Z,2020-05-05T19:38:47.207881833Z,load5,system,hostname,1.71,2020-05-05T19:38:20Z
,,2,2020-05-05T18:38:47.207881833Z,2020-05-05T19:38:47.207881833Z,load5,system,hostname,1.77,2020-05-05T19:38:30Z
,,2,2020-05-05T18:38:47.207881833Z,2020-05-05T19:38:47.207881833Z,load5,system,hostname,1.71,2020-05-05T19:38:40Z
,,2,2020-05-05T18:38:47.207881833Z,2020-05-05T19:38:47.207881833Z,load5,system,hostname,,2020-05-05T19:38:47.207881833Z
\ No newline at end of file
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string
#group,false,false,true,true,false,false,true,true,true,true
#default,_result,,,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement,a,b
,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T10:34:08.135814545Z,1.4,f,test,1,adsfasdf
,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.850214724Z,6.6,f,test,1,adsfasdf
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string,string
#group,false,false,true,true,false,false,true,true,true,true
#default,_result,,,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement,a,b
,,1,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T10:34:08.135814545Z,4,i,test,1,adsfasdf
,,1,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.850214724Z,-1,i,test,1,adsfasdf
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,bool,string,string,string,string
#group,false,false,true,true,false,false,true,true,true,true
#default,_result,,,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement,a,b
,,2,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.62797864Z,false,f,test,0,adsfasdf
,,2,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.969100374Z,true,f,test,0,adsfasdf
#datatype,string,long,dateTime:RFC3339Nano,dateTime:RFC3339Nano,dateTime:RFC3339Nano,unsignedLong,string,string,string,string
#group,false,false,true,true,false,false,true,true,true,true
#default,_result,,,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement,a,b
,,3,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.62797864Z,0,i,test,0,adsfasdf
,,3,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.969100374Z,2,i,test,0,adsfasdf
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string
#group,false,false,true,true,false,false,true,true,true,true
#default,_result,,,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement,a,b
,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T10:34:08.135814545Z,1.4,f,test,1,adsfasdf
,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.850214724Z,6.6,f,test,1,adsfasdf
......@@ -14,6 +14,7 @@ import (
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/tsdb"
"github.com/grafana/grafana/pkg/tsdb/influxdb/flux"
"golang.org/x/net/context/ctxhttp"
)
......@@ -42,8 +43,35 @@ func init() {
tsdb.RegisterTsdbQueryEndpoint("influxdb", NewInfluxDBExecutor)
}
func AllFlux(queries *tsdb.TsdbQuery) (bool, error) {
var hasFlux bool
var allFlux bool
for i, q := range queries.Queries {
qType := q.Model.Get("queryType").MustString("")
if qType == "Flux" {
hasFlux = true
if i == 0 && hasFlux {
allFlux = true
continue
}
}
if allFlux && qType != "Flux" {
return true, fmt.Errorf("when using flux, all queries must be a flux query")
}
}
return allFlux, nil
}
func (e *InfluxDBExecutor) Query(ctx context.Context, dsInfo *models.DataSource, tsdbQuery *tsdb.TsdbQuery) (*tsdb.Response, error) {
result := &tsdb.Response{}
allFlux, err := AllFlux(tsdbQuery)
if err != nil {
return nil, err
}
if allFlux {
return flux.Query(ctx, dsInfo, tsdbQuery)
}
query, err := e.getQuery(dsInfo, tsdbQuery.Queries, tsdbQuery)
if err != nil {
......
......@@ -24,6 +24,21 @@ export class ConfigEditor extends PureComponent<Props> {
updateDatasourcePluginResetOption(this.props, 'password');
};
onResetToken = () => {
updateDatasourcePluginResetOption(this.props, 'token');
};
onToggleFlux = (event: React.SyntheticEvent<HTMLInputElement>) => {
const { options, onOptionsChange } = this.props;
onOptionsChange({
...options,
jsonData: {
...options.jsonData,
enableFlux: !options.jsonData.enableFlux,
},
});
};
render() {
const { options, onOptionsChange } = this.props;
const { secureJsonFields } = options;
......@@ -40,6 +55,50 @@ export class ConfigEditor extends PureComponent<Props> {
<h3 className="page-heading">InfluxDB Details</h3>
<div className="gf-form-group">
<div className="gf-form-inline">
<LegacyForms.Switch
label="Enable flux"
labelClass="width-10"
checked={options.jsonData.enableFlux || false}
onChange={this.onToggleFlux}
tooltip="Suport flux query endpoint"
/>
</div>
{options.jsonData.enableFlux && (
<>
<div className="gf-form-inline">
<div className="gf-form">
<InlineFormLabel className="width-10">Organization</InlineFormLabel>
<div className="width-10">
<Input
className="width-10"
placeholder="enter organization"
value={options.jsonData.organization || ''}
onChange={onUpdateDatasourceJsonDataOption(this.props, 'organization')}
/>
</div>
</div>
</div>
<div className="gf-form-inline">
<div className="gf-form">
<InlineFormLabel className="width-10">Default Bucket</InlineFormLabel>
<div className="width-10">
<Input
className="width-10"
placeholder="default bucket"
value={options.jsonData.defaultBucket || ''}
onChange={onUpdateDatasourceJsonDataOption(this.props, 'defaultBucket')}
/>
</div>
</div>
</div>
<br />
<br />
</>
)}
<div className="gf-form-inline">
<div className="gf-form">
<InlineFormLabel className="width-10">Database</InlineFormLabel>
<div className="width-20">
......@@ -78,6 +137,19 @@ export class ConfigEditor extends PureComponent<Props> {
</div>
<div className="gf-form-inline">
<div className="gf-form">
<SecretFormField
isConfigured={(secureJsonFields && secureJsonFields.token) as boolean}
value={secureJsonData.token || ''}
label="Token"
labelWidth={10}
inputWidth={20}
onReset={this.onResetPassword}
onChange={onUpdateDatasourceSecureJsonDataOption(this.props, 'token')}
/>
</div>
</div>
<div className="gf-form-inline">
<div className="gf-form">
<InlineFormLabel
className="width-10"
tooltip="You can use either GET or POST HTTP method to query your InfluxDB database. The POST
......
......@@ -44,6 +44,17 @@ exports[`Render should disable basic auth password input 1`] = `
<div
className="gf-form-inline"
>
<Switch
checked={false}
label="Enable flux"
labelClass="width-10"
onChange={[Function]}
tooltip="Suport flux query endpoint"
/>
</div>
<div
className="gf-form-inline"
>
<div
className="gf-form"
>
......@@ -107,6 +118,22 @@ exports[`Render should disable basic auth password input 1`] = `
<div
className="gf-form"
>
<SecretFormField
inputWidth={20}
label="Token"
labelWidth={10}
onChange={[Function]}
onReset={[Function]}
value=""
/>
</div>
</div>
<div
className="gf-form-inline"
>
<div
className="gf-form"
>
<Component
className="width-10"
tooltip="You can use either GET or POST HTTP method to query your InfluxDB database. The POST
......@@ -261,6 +288,17 @@ exports[`Render should hide basic auth fields when switch off 1`] = `
<div
className="gf-form-inline"
>
<Switch
checked={false}
label="Enable flux"
labelClass="width-10"
onChange={[Function]}
tooltip="Suport flux query endpoint"
/>
</div>
<div
className="gf-form-inline"
>
<div
className="gf-form"
>
......@@ -324,6 +362,22 @@ exports[`Render should hide basic auth fields when switch off 1`] = `
<div
className="gf-form"
>
<SecretFormField
inputWidth={20}
label="Token"
labelWidth={10}
onChange={[Function]}
onReset={[Function]}
value=""
/>
</div>
</div>
<div
className="gf-form-inline"
>
<div
className="gf-form"
>
<Component
className="width-10"
tooltip="You can use either GET or POST HTTP method to query your InfluxDB database. The POST
......@@ -478,6 +532,17 @@ exports[`Render should hide white listed cookies input when browser access chose
<div
className="gf-form-inline"
>
<Switch
checked={false}
label="Enable flux"
labelClass="width-10"
onChange={[Function]}
tooltip="Suport flux query endpoint"
/>
</div>
<div
className="gf-form-inline"
>
<div
className="gf-form"
>
......@@ -541,6 +606,22 @@ exports[`Render should hide white listed cookies input when browser access chose
<div
className="gf-form"
>
<SecretFormField
inputWidth={20}
label="Token"
labelWidth={10}
onChange={[Function]}
onReset={[Function]}
value=""
/>
</div>
</div>
<div
className="gf-form-inline"
>
<div
className="gf-form"
>
<Component
className="width-10"
tooltip="You can use either GET or POST HTTP method to query your InfluxDB database. The POST
......@@ -695,6 +776,17 @@ exports[`Render should render component 1`] = `
<div
className="gf-form-inline"
>
<Switch
checked={false}
label="Enable flux"
labelClass="width-10"
onChange={[Function]}
tooltip="Suport flux query endpoint"
/>
</div>
<div
className="gf-form-inline"
>
<div
className="gf-form"
>
......@@ -758,6 +850,22 @@ exports[`Render should render component 1`] = `
<div
className="gf-form"
>
<SecretFormField
inputWidth={20}
label="Token"
labelWidth={10}
onChange={[Function]}
onReset={[Function]}
value=""
/>
</div>
</div>
<div
className="gf-form-inline"
>
<div
className="gf-form"
>
<Component
className="width-10"
tooltip="You can use either GET or POST HTTP method to query your InfluxDB database. The POST
......
import _ from 'lodash';
import { dateMath, DataSourceApi, DataSourceInstanceSettings, ScopedVars } from '@grafana/data';
import { dateMath, DataSourceInstanceSettings, ScopedVars, DataQueryRequest, DataQueryResponse } from '@grafana/data';
import InfluxSeries from './influx_series';
import InfluxQueryModel from './influx_query_model';
import ResponseParser from './response_parser';
import { InfluxQueryBuilder } from './query_builder';
import { InfluxQuery, InfluxOptions } from './types';
import { getBackendSrv } from '@grafana/runtime';
import { TemplateSrv } from 'app/features/templating/template_srv';
import { InfluxQuery, InfluxOptions, InfluxQueryType } from './types';
import { getBackendSrv, getTemplateSrv, DataSourceWithBackend } from '@grafana/runtime';
import { Observable, from } from 'rxjs';
export default class InfluxDatasource extends DataSourceApi<InfluxQuery, InfluxOptions> {
export default class InfluxDatasource extends DataSourceWithBackend<InfluxQuery, InfluxOptions> {
type: string;
urls: any;
urls: string[];
username: string;
password: string;
name: string;
......@@ -21,17 +21,17 @@ export default class InfluxDatasource extends DataSourceApi<InfluxQuery, InfluxO
interval: any;
responseParser: any;
httpMode: string;
enableFlux: boolean;
/** @ngInject */
constructor(instanceSettings: DataSourceInstanceSettings<InfluxOptions>, private templateSrv: TemplateSrv) {
constructor(instanceSettings: DataSourceInstanceSettings<InfluxOptions>) {
super(instanceSettings);
this.type = 'influxdb';
this.urls = _.map(instanceSettings.url.split(','), url => {
return url.trim();
});
this.username = instanceSettings.username;
this.password = instanceSettings.password;
this.username = instanceSettings.username ?? '';
this.password = instanceSettings.password ?? '';
this.name = instanceSettings.name;
this.database = instanceSettings.database;
this.basicAuth = instanceSettings.basicAuth;
......@@ -40,15 +40,69 @@ export default class InfluxDatasource extends DataSourceApi<InfluxQuery, InfluxO
this.interval = settingsData.timeInterval;
this.httpMode = settingsData.httpMode || 'GET';
this.responseParser = new ResponseParser();
this.enableFlux = !!settingsData.enableFlux;
}
query(options: any) {
query(request: DataQueryRequest<InfluxQuery>): Observable<DataQueryResponse> {
let hasFlux = false;
let allFlux = true;
// Update the queryType fields and manage migrations
for (const target of request.targets) {
if (target.queryType === InfluxQueryType.Flux) {
hasFlux = true;
} else {
allFlux = false;
if (target.queryType === InfluxQueryType.Classic) {
delete target.rawQuery;
} else if (target.rawQuery) {
target.queryType = InfluxQueryType.InfluxQL;
} else if (target.queryType === InfluxQueryType.InfluxQL) {
target.rawQuery = true; // so the old version works
} else {
target.queryType = InfluxQueryType.Classic; // Explicitly set it to classic
delete target.rawQuery;
}
}
}
// Proces flux queries (data frame request)
if (hasFlux) {
if (!this.enableFlux) {
throw 'Flux not enabled for this datasource';
}
if (!allFlux) {
throw 'All queries must be flux';
}
// Calls /api/tsdb/query
return super.query(request);
}
// Fallback to classic query support
return from(this.classicQuery(request));
}
/**
* Only applied on flux queries
*/
applyTemplateVariables(query: InfluxQuery, scopedVars: ScopedVars): Record<string, any> {
return {
...query,
query: getTemplateSrv().replace(query.query, scopedVars), // The raw query text
};
}
/**
* The unchanged pre 7.1 query implementation
*/
async classicQuery(options: any): Promise<DataQueryResponse> {
let timeFilter = this.getTimeFilter(options);
const scopedVars = options.scopedVars;
const targets = _.cloneDeep(options.targets);
const queryTargets: any[] = [];
let queryModel: InfluxQueryModel;
let i, y;
const templateSrv = getTemplateSrv();
let allQueries = _.map(targets, target => {
if (target.hide) {
......@@ -60,7 +114,7 @@ export default class InfluxDatasource extends DataSourceApi<InfluxQuery, InfluxO
// backward compatibility
scopedVars.interval = scopedVars.__interval;
queryModel = new InfluxQueryModel(target, this.templateSrv, scopedVars);
queryModel = new InfluxQueryModel(target, templateSrv, scopedVars);
return queryModel.render(true);
}).reduce((acc, current) => {
if (current !== '') {
......@@ -74,7 +128,7 @@ export default class InfluxDatasource extends DataSourceApi<InfluxQuery, InfluxO
}
// add global adhoc filters to timeFilter
const adhocFilters = this.templateSrv.getAdhocFilters(this.name);
const adhocFilters = (templateSrv as any).getAdhocFilters(this.name);
if (adhocFilters.length > 0) {
timeFilter += ' AND ' + queryModel.renderAdhocFilters(adhocFilters);
}
......@@ -83,7 +137,7 @@ export default class InfluxDatasource extends DataSourceApi<InfluxQuery, InfluxO
scopedVars.timeFilter = { value: timeFilter };
// replace templated variables
allQueries = this.templateSrv.replace(allQueries, scopedVars);
allQueries = templateSrv.replace(allQueries, scopedVars);
return this._seriesQuery(allQueries, options).then((data: any): any => {
if (!data || !data.results) {
......@@ -100,7 +154,7 @@ export default class InfluxDatasource extends DataSourceApi<InfluxQuery, InfluxO
const target = queryTargets[i];
let alias = target.alias;
if (alias) {
alias = this.templateSrv.replace(target.alias, options.scopedVars);
alias = templateSrv.replace(target.alias, options.scopedVars);
}
const influxSeries = new InfluxSeries({
......@@ -136,7 +190,7 @@ export default class InfluxDatasource extends DataSourceApi<InfluxQuery, InfluxO
const timeFilter = this.getTimeFilter({ rangeRaw: options.rangeRaw, timezone: options.timezone });
let query = options.annotation.query.replace('$timeFilter', timeFilter);
query = this.templateSrv.replace(query, null, 'regex');
query = getTemplateSrv().replace(query, null, 'regex');
return this._seriesQuery(query, options).then((data: any) => {
if (!data || !data.results || !data.results[0]) {
......@@ -150,16 +204,18 @@ export default class InfluxDatasource extends DataSourceApi<InfluxQuery, InfluxO
}
targetContainsTemplate(target: any) {
const templateSrv = getTemplateSrv() as any; // :(
for (const group of target.groupBy) {
for (const param of group.params) {
if (this.templateSrv.variableExists(param)) {
if (templateSrv.variableExists(param)) {
return true;
}
}
}
for (const i in target.tags) {
if (this.templateSrv.variableExists(target.tags[i].value)) {
if (templateSrv.variableExists(target.tags[i].value)) {
return true;
}
}
......@@ -174,23 +230,25 @@ export default class InfluxDatasource extends DataSourceApi<InfluxQuery, InfluxO
let expandedQueries = queries;
if (queries && queries.length > 0) {
const templateSrv = getTemplateSrv();
expandedQueries = queries.map(query => {
const expandedQuery = {
...query,
datasource: this.name,
measurement: this.templateSrv.replace(query.measurement, scopedVars, 'regex'),
policy: this.templateSrv.replace(query.policy, scopedVars, 'regex'),
measurement: templateSrv.replace(query.measurement ?? '', scopedVars, 'regex'),
policy: templateSrv.replace(query.policy ?? '', scopedVars, 'regex'),
};
if (query.rawQuery) {
expandedQuery.query = this.templateSrv.replace(query.query, scopedVars, 'regex');
expandedQuery.query = templateSrv.replace(query.query ?? '', scopedVars, 'regex');
}
if (query.tags) {
const expandedTags = query.tags.map(tag => {
const expandedTag = {
...tag,
value: this.templateSrv.replace(tag.value, null, 'regex'),
value: templateSrv.replace(tag.value, null, 'regex'),
};
return expandedTag;
});
......@@ -203,7 +261,7 @@ export default class InfluxDatasource extends DataSourceApi<InfluxQuery, InfluxO
}
metricFindQuery(query: string, options?: any) {
const interpolated = this.templateSrv.replace(query, null, 'regex');
const interpolated = getTemplateSrv().replace(query, null, 'regex');
return this._seriesQuery(interpolated, options).then(resp => {
return this.responseParser.parse(query, resp);
......@@ -253,6 +311,7 @@ export default class InfluxDatasource extends DataSourceApi<InfluxQuery, InfluxO
).join('&');
}
// TODO: remove this so that everything gets sent to /healthcheck!
testDatasource() {
const queryBuilder = new InfluxQueryBuilder({ measurement: '', tags: [] }, this.database);
const query = queryBuilder.buildExploreQuery('RETENTION POLICIES');
......
......@@ -3,7 +3,7 @@ import queryPart from './query_part';
import kbn from 'app/core/utils/kbn';
import { InfluxQuery, InfluxQueryTag } from './types';
import { ScopedVars } from '@grafana/data';
import { TemplateSrv } from 'app/features/templating/template_srv';
import { TemplateSrv } from '@grafana/runtime';
export default class InfluxQueryModel {
target: InfluxQuery;
......
<query-editor-row query-ctrl="ctrl" can-collapse="true" has-text-edit-mode="true">
<div ng-if="ctrl.target.rawQuery">
<div ng-if="ctrl.datasource.enableFlux" class="gf-form-inline">
<div class="gf-form">
<label class="gf-form-label query-keyword width-7">QUERY</label>
<div class="gf-form-select-wrapper">
<select
class="gf-form-input gf-size-auto"
ng-model="ctrl.target.queryType"
ng-options="f.value as f.text for f in ctrl.queryTypes"
ng-change="ctrl.refresh()"
></select>
</div>
</div>
<div class="gf-form gf-form--grow">
<div class="gf-form-label gf-form-label--grow"></div>
</div>
</div>
<!-- TODO use monaco flux editor -->
<div ng-if="ctrl.target.queryType === 'Flux'">
<div class="gf-form">
<textarea
rows="3"
class="gf-form-input"
ng-model="ctrl.target.query"
spellcheck="false"
placeholder="Flux Query"
ng-model-onblur
ng-change="ctrl.refresh()"
></textarea>
</div>
</div>
<div ng-if="ctrl.target.queryType === 'InfluxQL' || !ctrl.target.queryType">
<div class="gf-form">
<textarea
rows="3"
......@@ -40,7 +72,7 @@
</div>
</div>
<div ng-if="!ctrl.target.rawQuery">
<div ng-if="ctrl.target.queryType === 'Classic'">
<div class="gf-form-inline">
<div class="gf-form">
<label class="gf-form-label query-keyword width-7">FROM</label>
......
......@@ -5,6 +5,7 @@ import InfluxQueryModel from './influx_query_model';
import queryPart from './query_part';
import { QueryCtrl } from 'app/plugins/sdk';
import { TemplateSrv } from 'app/features/templating/template_srv';
import { InfluxQueryType } from './types';
export class InfluxQueryCtrl extends QueryCtrl {
static templateUrl = 'partials/query.editor.html';
......@@ -13,6 +14,7 @@ export class InfluxQueryCtrl extends QueryCtrl {
queryBuilder: any;
groupBySegment: any;
resultFormats: any[];
queryTypes: any[];
orderByTime: any[];
policySegment: any;
tagSegments: any[];
......@@ -36,6 +38,16 @@ export class InfluxQueryCtrl extends QueryCtrl {
{ text: 'Time series', value: 'time_series' },
{ text: 'Table', value: 'table' },
];
// Show a dropdown for flux
if (this.datasource.enableFlux) {
this.queryTypes = [
{ text: 'Classic', value: InfluxQueryType.Classic },
{ text: 'InfluxQL', value: InfluxQueryType.InfluxQL },
{ text: 'Flux', value: InfluxQueryType.Flux },
];
}
this.policySegment = uiSegmentSrv.newSegment(this.target.policy);
if (!this.target.measurement) {
......@@ -236,12 +248,21 @@ export class InfluxQueryCtrl extends QueryCtrl {
}
toggleEditorMode() {
console.log('Toggle influx edit mode:', this.target);
try {
this.target.query = this.queryModel.render(false);
} catch (err) {
console.log('query render error');
}
this.target.rawQuery = !this.target.rawQuery;
const { queryType } = this.target;
if (queryType === InfluxQueryType.Flux || queryType === queryType.InfluxQL) {
this.target.queryType = InfluxQueryType.Classic;
this.target.rawQuery = false;
} else if (this.datasource.enableFlux) {
this.target.queryType = InfluxQueryType.Flux;
} else {
this.target.queryType = InfluxQueryType.InfluxQL;
}
}
getMeasurements(measurementFilter: any) {
......
......@@ -3,15 +3,17 @@ import InfluxDatasource from '../datasource';
import { TemplateSrvStub } from 'test/specs/helpers';
import { backendSrv } from 'app/core/services/backend_srv'; // will use the version in __mocks__
//@ts-ignore
const templateSrv = new TemplateSrvStub();
jest.mock('@grafana/runtime', () => ({
...jest.requireActual('@grafana/runtime'),
getBackendSrv: () => backendSrv,
getTemplateSrv: () => templateSrv,
}));
describe('InfluxDataSource', () => {
const ctx: any = {
//@ts-ignore
templateSrv: new TemplateSrvStub(),
instanceSettings: { url: 'url', name: 'influxDb', jsonData: { httpMode: 'GET' } },
};
......@@ -20,7 +22,7 @@ describe('InfluxDataSource', () => {
beforeEach(() => {
jest.clearAllMocks();
ctx.instanceSettings.url = '/api/datasources/proxy/1';
ctx.ds = new InfluxDatasource(ctx.instanceSettings, ctx.templateSrv);
ctx.ds = new InfluxDatasource(ctx.instanceSettings);
});
describe('When issuing metricFindQuery', () => {
......@@ -108,7 +110,7 @@ describe('InfluxDataSource', () => {
});
try {
await ctx.ds.query(queryOptions);
await ctx.ds.query(queryOptions).toPromise();
} catch (err) {
expect(err.message).toBe('InfluxDB Error: Query timeout');
}
......@@ -117,14 +119,12 @@ describe('InfluxDataSource', () => {
describe('InfluxDataSource in POST query mode', () => {
const ctx: any = {
//@ts-ignore
templateSrv: new TemplateSrvStub(),
instanceSettings: { url: 'url', name: 'influxDb', jsonData: { httpMode: 'POST' } },
};
beforeEach(() => {
ctx.instanceSettings.url = '/api/datasources/proxy/1';
ctx.ds = new InfluxDatasource(ctx.instanceSettings, ctx.templateSrv);
ctx.ds = new InfluxDatasource(ctx.instanceSettings);
});
describe('When issuing metricFindQuery', () => {
......
......@@ -3,10 +3,17 @@ import { DataQuery, DataSourceJsonData } from '@grafana/data';
export interface InfluxOptions extends DataSourceJsonData {
timeInterval: string;
httpMode: string;
// Influx 2.0
enableFlux?: boolean;
organization?: string;
defaultBucket?: string;
maxSeries?: number;
}
export interface InfluxSecureJsonData {
password?: string;
token?: string;
}
export interface InfluxQueryPart {
......@@ -22,7 +29,14 @@ export interface InfluxQueryTag {
value: string;
}
export enum InfluxQueryType {
Classic = 'Classic', // IFQL query builder
InfluxQL = 'InfluxQL', // raw ifql
Flux = 'Flux',
}
export interface InfluxQuery extends DataQuery {
queryType?: InfluxQueryType;
policy?: string;
measurement?: string;
resultFormat?: 'time_series' | 'table';
......@@ -34,6 +48,6 @@ export interface InfluxQuery extends DataQuery {
slimit?: string;
tz?: string;
fill?: string;
rawQuery?: boolean;
rawQuery?: boolean; // deprecated (use raw InfluxQL)
query?: string;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment