Commit 0e1e8565 by Arve Knudsen Committed by GitHub

InfluxDB: Upgrade InfluxDB in devenv (#26983)

* InfluxDB: Upgrade InfluxDB in devenv
* InfluxDB: De-export symbols
* InfluxDB: Remove unused code
* devenv: Make InfluxDB version configurable

Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
parent 689ca1be
influxdb_version=1.8.1-alpine
influxdb:
image: influxdb:1.7.6
image: influxdb:${influxdb_version}
container_name: influxdb
ports:
- '2004:2004'
......
......@@ -27,8 +27,8 @@ type columnInfo struct {
converter *data.FieldConverter
}
// FrameBuilder This is an interface to help testing
type FrameBuilder struct {
// frameBuilder is an interface to help testing.
type frameBuilder struct {
tableID int64
active *data.Frame
frames []*data.Frame
......@@ -50,23 +50,23 @@ func isTag(schk string) bool {
func getConverter(t string) (*data.FieldConverter, error) {
switch t {
case stringDatatype:
return &AnyToOptionalString, nil
return &anyToOptionalString, nil
case timeDatatypeRFC:
return &TimeToOptionalTime, nil
return &timeToOptionalTime, nil
case timeDatatypeRFCNano:
return &TimeToOptionalTime, nil
return &timeToOptionalTime, nil
case durationDatatype:
return &Int64ToOptionalInt64, nil
return &int64ToOptionalInt64, nil
case doubleDatatype:
return &Float64ToOptionalFloat64, nil
return &float64ToOptionalFloat64, nil
case boolDatatype:
return &BoolToOptionalBool, nil
return &boolToOptionalBool, nil
case longDatatype:
return &Int64ToOptionalInt64, nil
return &int64ToOptionalInt64, nil
case uLongDatatype:
return &UInt64ToOptionalUInt64, nil
return &uint64ToOptionalUInt64, nil
case base64BinaryDataType:
return &AnyToOptionalString, nil
return &anyToOptionalString, nil
}
return nil, fmt.Errorf("no matching converter found for [%v]", t)
......@@ -75,7 +75,7 @@ func getConverter(t string) (*data.FieldConverter, error) {
// Init initializes the frame to be returned
// fields points at entries in the frame, and provides easier access
// names indexes the columns encountered
func (fb *FrameBuilder) Init(metadata *query.FluxTableMetadata) error {
func (fb *frameBuilder) Init(metadata *query.FluxTableMetadata) error {
columns := metadata.Columns()
fb.frames = make([]*data.Frame, 0)
fb.tableID = -1
......@@ -153,7 +153,7 @@ func getTimeSeriesTimeColumn(columns []*query.FluxColumn) *query.FluxColumn {
// Tags are appended as labels
// _measurement holds the dataframe name
// _field holds the field name.
func (fb *FrameBuilder) Append(record *query.FluxRecord) error {
func (fb *frameBuilder) Append(record *query.FluxRecord) error {
table, ok := record.ValueByKey("table").(int64)
if ok && table != fb.tableID {
fb.totalSeries++
......
......@@ -2,34 +2,13 @@ package flux
import (
"fmt"
"strconv"
"time"
"github.com/grafana/grafana-plugin-sdk-go/data"
)
// Int64NOOP .....
var Int64NOOP = data.FieldConverter{
OutputFieldType: data.FieldTypeInt64,
}
// BoolNOOP .....
var BoolNOOP = data.FieldConverter{
OutputFieldType: data.FieldTypeBool,
}
// Float64NOOP .....
var Float64NOOP = data.FieldConverter{
OutputFieldType: data.FieldTypeFloat64,
}
// StringNOOP value is already in the proper format
var StringNOOP = data.FieldConverter{
OutputFieldType: data.FieldTypeString,
}
// AnyToOptionalString any value as a string
var AnyToOptionalString = data.FieldConverter{
// anyToOptionalString any value as a string.
var anyToOptionalString = data.FieldConverter{
OutputFieldType: data.FieldTypeNullableString,
Converter: func(v interface{}) (interface{}, error) {
if v == nil {
......@@ -40,8 +19,8 @@ var AnyToOptionalString = data.FieldConverter{
},
}
// Float64ToOptionalFloat64 optional float value
var Float64ToOptionalFloat64 = data.FieldConverter{
// float64ToOptionalFloat64 optional float value
var float64ToOptionalFloat64 = data.FieldConverter{
OutputFieldType: data.FieldTypeNullableFloat64,
Converter: func(v interface{}) (interface{}, error) {
if v == nil {
......@@ -55,8 +34,8 @@ var Float64ToOptionalFloat64 = data.FieldConverter{
},
}
// Int64ToOptionalInt64 optional int value
var Int64ToOptionalInt64 = data.FieldConverter{
// int64ToOptionalInt64 optional int value
var int64ToOptionalInt64 = data.FieldConverter{
OutputFieldType: data.FieldTypeNullableInt64,
Converter: func(v interface{}) (interface{}, error) {
if v == nil {
......@@ -70,8 +49,8 @@ var Int64ToOptionalInt64 = data.FieldConverter{
},
}
// UInt64ToOptionalUInt64 optional int value
var UInt64ToOptionalUInt64 = data.FieldConverter{
// uint64ToOptionalUInt64 optional int value
var uint64ToOptionalUInt64 = data.FieldConverter{
OutputFieldType: data.FieldTypeNullableUint64,
Converter: func(v interface{}) (interface{}, error) {
if v == nil {
......@@ -85,8 +64,8 @@ var UInt64ToOptionalUInt64 = data.FieldConverter{
},
}
// BoolToOptionalBool optional int value
var BoolToOptionalBool = data.FieldConverter{
// boolToOptionalBool optional int value
var boolToOptionalBool = data.FieldConverter{
OutputFieldType: data.FieldTypeNullableBool,
Converter: func(v interface{}) (interface{}, error) {
if v == nil {
......@@ -100,8 +79,8 @@ var BoolToOptionalBool = data.FieldConverter{
},
}
// TimeToOptionalTime optional int value
var TimeToOptionalTime = data.FieldConverter{
// timeToOptionalTime optional int value
var timeToOptionalTime = data.FieldConverter{
OutputFieldType: data.FieldTypeNullableTime,
Converter: func(v interface{}) (interface{}, error) {
if v == nil {
......@@ -114,70 +93,3 @@ var TimeToOptionalTime = data.FieldConverter{
return &val, nil
},
}
// RFC3339StringToNullableTime .....
func RFC3339StringToNullableTime(s string) (*time.Time, error) {
if s == "" {
return nil, nil
}
rv, err := time.Parse(time.RFC3339, s)
if err != nil {
return nil, err
}
u := rv.UTC()
return &u, nil
}
// StringToOptionalFloat64 string to float
var StringToOptionalFloat64 = data.FieldConverter{
OutputFieldType: data.FieldTypeNullableFloat64,
Converter: func(v interface{}) (interface{}, error) {
if v == nil {
return nil, nil
}
val, ok := v.(string)
if !ok { // or return some default value instead of erroring
return nil, fmt.Errorf("[floatz] expected string input but got type %T", v)
}
fV, err := strconv.ParseFloat(val, 64)
return &fV, err
},
}
// Float64EpochSecondsToTime numeric seconds to time
var Float64EpochSecondsToTime = data.FieldConverter{
OutputFieldType: data.FieldTypeTime,
Converter: func(v interface{}) (interface{}, error) {
fV, ok := v.(float64)
if !ok { // or return some default value instead of erroring
return nil, fmt.Errorf("[seconds] expected float64 input but got type %T", v)
}
return time.Unix(int64(fV), 0).UTC(), nil
},
}
// Float64EpochMillisToTime convert to time
var Float64EpochMillisToTime = data.FieldConverter{
OutputFieldType: data.FieldTypeTime,
Converter: func(v interface{}) (interface{}, error) {
fV, ok := v.(float64)
if !ok { // or return some default value instead of erroring
return nil, fmt.Errorf("[ms] expected float64 input but got type %T", v)
}
return time.Unix(0, int64(fV)*int64(time.Millisecond)).UTC(), nil
},
}
// Boolean ...
var Boolean = data.FieldConverter{
OutputFieldType: data.FieldTypeBool,
Converter: func(v interface{}) (interface{}, error) {
fV, ok := v.(bool)
if !ok { // or return some default value instead of erroring
return nil, fmt.Errorf("[ms] expected bool input but got type %T", v)
}
return fV, nil
},
}
......@@ -9,12 +9,12 @@ import (
"github.com/influxdata/influxdb-client-go/v2/api"
)
// executeQuery runs a flux query using the QueryModel to interpolate the query and the runner to execute it.
// executeQuery runs a flux query using the queryModel to interpolate the query and the runner to execute it.
// maxSeries somehow limits the response.
func executeQuery(ctx context.Context, query QueryModel, runner queryRunner, maxSeries int) (dr backend.DataResponse) {
func executeQuery(ctx context.Context, query queryModel, runner queryRunner, maxSeries int) (dr backend.DataResponse) {
dr = backend.DataResponse{}
flux, err := Interpolate(query)
flux, err := interpolate(query)
if err != nil {
dr.Error = err
return
......@@ -50,7 +50,7 @@ func readDataFrames(result *api.QueryTableResult, maxPoints int, maxSeries int)
glog.Debug("Reading data frames from query result", "maxPoints", maxPoints, "maxSeries", maxSeries)
dr = backend.DataResponse{}
builder := &FrameBuilder{
builder := &frameBuilder{
maxPoints: maxPoints,
maxSeries: maxSeries,
}
......
......@@ -55,7 +55,7 @@ func verifyGoldenResponse(name string) (*backend.DataResponse, error) {
testDataPath: name + ".csv",
}
dr := executeQuery(context.Background(), QueryModel{MaxDataPoints: 100}, runner, 50)
dr := executeQuery(context.Background(), queryModel{MaxDataPoints: 100}, runner, 50)
err := experimental.CheckGoldenDataResponse("./testdata/"+name+".golden.txt", &dr, true)
return &dr, err
}
......
......@@ -26,29 +26,29 @@ func Query(ctx context.Context, dsInfo *models.DataSource, tsdbQuery *tsdb.TsdbQ
tRes := &tsdb.Response{
Results: make(map[string]*tsdb.QueryResult),
}
runner, err := RunnerFromDataSource(dsInfo)
r, err := runnerFromDataSource(dsInfo)
if err != nil {
return nil, err
}
defer runner.client.Close()
defer r.client.Close()
for _, query := range tsdbQuery.Queries {
qm, err := GetQueryModelTSDB(query, tsdbQuery.TimeRange, dsInfo)
qm, err := getQueryModelTSDB(query, tsdbQuery.TimeRange, dsInfo)
if err != nil {
tRes.Results[query.RefId] = &tsdb.QueryResult{Error: err}
continue
}
res := executeQuery(context.Background(), *qm, runner, 50)
res := executeQuery(context.Background(), *qm, r, 50)
tRes.Results[query.RefId] = backendDataResponseToTSDBResponse(&res, query.RefId)
}
return tRes, nil
}
// Runner is an influxdb2 Client with an attached org property and is used
// runner is an influxdb2 Client with an attached org property and is used
// for running flux queries.
type Runner struct {
type runner struct {
client influxdb2.Client
org string
}
......@@ -59,13 +59,13 @@ type queryRunner interface {
}
// runQuery executes fluxQuery against the Runner's organization and returns a Flux typed result.
func (r *Runner) runQuery(ctx context.Context, fluxQuery string) (*api.QueryTableResult, error) {
func (r *runner) runQuery(ctx context.Context, fluxQuery string) (*api.QueryTableResult, error) {
qa := r.client.QueryAPI(r.org)
return qa.Query(ctx, fluxQuery)
}
// RunnerFromDataSource creates a runner from the datasource model (the datasource instance's configuration).
func RunnerFromDataSource(dsInfo *models.DataSource) (*Runner, error) {
// runnerFromDataSource creates a runner from the datasource model (the datasource instance's configuration).
func runnerFromDataSource(dsInfo *models.DataSource) (*runner, error) {
org := dsInfo.JsonData.Get("organization").MustString("")
if org == "" {
return nil, fmt.Errorf("missing organization in datasource configuration")
......@@ -86,7 +86,7 @@ func RunnerFromDataSource(dsInfo *models.DataSource) (*Runner, error) {
return nil, err
}
opts.HTTPOptions().SetHTTPClient(hc)
return &Runner{
return &runner{
client: influxdb2.NewClientWithOptions(url, token, opts),
org: org,
}, nil
......
......@@ -11,8 +11,8 @@ import (
const variableFilter = `(?m)([a-zA-Z]+)\.([a-zA-Z]+)`
// Interpolate processes macros
func Interpolate(query QueryModel) (string, error) {
// interpolate processes macros
func interpolate(query queryModel) (string, error) {
flux := query.RawQuery
variableFilterExp, err := regexp.Compile(variableFilter)
......
......@@ -6,6 +6,8 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestInterpolate(t *testing.T) {
......@@ -17,7 +19,7 @@ func TestInterpolate(t *testing.T) {
To: time.Unix(0, 0),
}
options := QueryOptions{
options := queryOptions{
Organization: "grafana1",
Bucket: "grafana2",
DefaultBucket: "grafana3",
......@@ -36,20 +38,17 @@ func TestInterpolate(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
query := QueryModel{
query := queryModel{
RawQuery: tt.before,
Options: options,
TimeRange: timeRange,
MaxDataPoints: 1,
Interval: 1000 * 1000 * 1000,
}
interpolatedQuery, err := Interpolate(query)
if err != nil {
t.Fatal(err)
}
if diff := cmp.Diff(tt.after, interpolatedQuery); diff != "" {
t.Fatalf("Result mismatch (-want +got):\n%s", diff)
}
interpolatedQuery, err := interpolate(query)
require.NoError(t, err)
diff := cmp.Diff(tt.after, interpolatedQuery)
assert.Equal(t, "", diff)
})
}
}
......@@ -10,17 +10,17 @@ import (
"github.com/grafana/grafana/pkg/tsdb"
)
// QueryOptions represents datasource configuration options
type QueryOptions struct {
// queryOptions represents datasource configuration options
type queryOptions struct {
Bucket string `json:"bucket"`
DefaultBucket string `json:"defaultBucket"`
Organization string `json:"organization"`
}
// QueryModel represents a spreadsheet query.
type QueryModel struct {
// queryModel represents a query.
type queryModel struct {
RawQuery string `json:"query"`
Options QueryOptions `json:"options"`
Options queryOptions `json:"options"`
// Not from JSON
TimeRange backend.TimeRange `json:"-"`
......@@ -31,8 +31,8 @@ type QueryModel struct {
// The following is commented out but kept as it should be useful when
// restoring this code to be closer to the SDK's models.
// func GetQueryModel(query backend.DataQuery) (*QueryModel, error) {
// model := &QueryModel{}
// func GetQueryModel(query backend.DataQuery) (*queryModel, error) {
// model := &queryModel{}
// err := json.Unmarshal(query.JSON, &model)
// if err != nil {
......@@ -46,9 +46,9 @@ type QueryModel struct {
// return model, nil
// }
// GetQueryModelTSDB builds a QueryModel from tsdb.Query information and datasource configuration (dsInfo).
func GetQueryModelTSDB(query *tsdb.Query, timeRange *tsdb.TimeRange, dsInfo *models.DataSource) (*QueryModel, error) {
model := &QueryModel{}
// getQueryModelTSDB builds a queryModel from tsdb.Query information and datasource configuration (dsInfo).
func getQueryModelTSDB(query *tsdb.Query, timeRange *tsdb.TimeRange, dsInfo *models.DataSource) (*queryModel, error) {
model := &queryModel{}
queryBytes, err := query.Model.Encode()
if err != nil {
return nil, fmt.Errorf("failed to re-encode the flux query into JSON: %w", err)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment