Commit 3a9a4f4a by Ryan McKinley Committed by GitHub

InfluxDB/Flux: support timeseries with a renamed time field (#25910)

parent bb4b7381
...@@ -2,6 +2,7 @@ package flux ...@@ -2,6 +2,7 @@ package flux
import ( import (
"fmt" "fmt"
"time"
"github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/influxdata/influxdb-client-go/api/query" "github.com/influxdata/influxdb-client-go/api/query"
...@@ -38,6 +39,8 @@ type FrameBuilder struct { ...@@ -38,6 +39,8 @@ type FrameBuilder struct {
maxSeries int // max number of series maxSeries int // max number of series
totalSeries int totalSeries int
isTimeSeries bool isTimeSeries bool
timeColumn string // sometimes it is not `_time`
timeDisplay string
} }
func isTag(schk string) bool { func isTag(schk string) bool {
...@@ -79,6 +82,7 @@ func (fb *FrameBuilder) Init(metadata *query.FluxTableMetadata) error { ...@@ -79,6 +82,7 @@ func (fb *FrameBuilder) Init(metadata *query.FluxTableMetadata) error {
fb.value = nil fb.value = nil
fb.columns = make([]columnInfo, 0) fb.columns = make([]columnInfo, 0)
fb.isTimeSeries = false fb.isTimeSeries = false
fb.timeColumn = ""
for _, col := range columns { for _, col := range columns {
switch { switch {
...@@ -97,7 +101,17 @@ func (fb *FrameBuilder) Init(metadata *query.FluxTableMetadata) error { ...@@ -97,7 +101,17 @@ func (fb *FrameBuilder) Init(metadata *query.FluxTableMetadata) error {
} }
} }
if !fb.isTimeSeries { if fb.isTimeSeries {
col := getTimeSeriesTimeColumn(columns)
if col == nil {
return fmt.Errorf("no time column in timeSeries")
}
fb.timeColumn = col.Name()
fb.timeDisplay = "Time"
if "_time" != fb.timeColumn {
fb.timeDisplay = col.Name()
}
} else {
fb.labels = make([]string, 0) fb.labels = make([]string, 0)
for _, col := range columns { for _, col := range columns {
converter, err := getConverter(col.DataType()) converter, err := getConverter(col.DataType())
...@@ -114,6 +128,23 @@ func (fb *FrameBuilder) Init(metadata *query.FluxTableMetadata) error { ...@@ -114,6 +128,23 @@ func (fb *FrameBuilder) Init(metadata *query.FluxTableMetadata) error {
return nil return nil
} }
func getTimeSeriesTimeColumn(columns []*query.FluxColumn) *query.FluxColumn {
// First look for '_time' column
for _, col := range columns {
if col.Name() == "_time" && col.DataType() == timeDatatypeRFC || col.DataType() == timeDatatypeRFCNano {
return col
}
}
// Then any time column
for _, col := range columns {
if col.DataType() == timeDatatypeRFC || col.DataType() == timeDatatypeRFCNano {
return col
}
}
return nil
}
// Append appends a single entry from an influxdb2 record to a data frame // Append appends a single entry from an influxdb2 record to a data frame
// Values are appended to _value // Values are appended to _value
// Tags are appended as labels // Tags are appended as labels
...@@ -139,7 +170,7 @@ func (fb *FrameBuilder) Append(record *query.FluxRecord) error { ...@@ -139,7 +170,7 @@ func (fb *FrameBuilder) Append(record *query.FluxRecord) error {
data.NewFieldFromFieldType(fb.value.OutputFieldType, 0), data.NewFieldFromFieldType(fb.value.OutputFieldType, 0),
) )
fb.active.Fields[0].Name = "Time" fb.active.Fields[0].Name = fb.timeDisplay
name, ok := record.ValueByKey("_field").(string) name, ok := record.ValueByKey("_field").(string)
if ok { if ok {
fb.active.Fields[1].Name = name fb.active.Fields[1].Name = name
...@@ -168,11 +199,16 @@ func (fb *FrameBuilder) Append(record *query.FluxRecord) error { ...@@ -168,11 +199,16 @@ func (fb *FrameBuilder) Append(record *query.FluxRecord) error {
} }
if fb.isTimeSeries { if fb.isTimeSeries {
time, ok := record.ValueByKey(fb.timeColumn).(time.Time)
if !ok {
return fmt.Errorf("unable to get time colum: %s", fb.timeColumn)
}
val, err := fb.value.Converter(record.Value()) val, err := fb.value.Converter(record.Value())
if err != nil { if err != nil {
return err return err
} }
fb.active.Fields[0].Append(record.Time()) fb.active.Fields[0].Append(time)
fb.active.Fields[1].Append(val) fb.active.Fields[1].Append(val)
} else { } else {
// Table view // Table view
......
...@@ -209,6 +209,51 @@ func TestAggregateGrouping(t *testing.T) { ...@@ -209,6 +209,51 @@ func TestAggregateGrouping(t *testing.T) {
}) })
} }
func TestNonStandardTimeColumn(t *testing.T) {
ctx := context.Background()
t.Run("Time Column", func(t *testing.T) {
runner := &MockRunner{
testDataPath: "non_standard_time_column.csv",
}
dr := ExecuteQuery(ctx, QueryModel{MaxDataPoints: 100}, runner, 50)
if dr.Error != nil {
t.Fatal(dr.Error)
}
if len(dr.Frames) != 1 {
t.Fatal("Expected one frame")
}
str, _ := dr.Frames[0].StringTable(-1, -1)
fmt.Println(str)
// Dimensions: 2 Fields by 1 Rows
// +-----------------------------------------+------------------+
// | Name: _start_water | Name: |
// | Labels: | Labels: st=1 |
// | Type: []time.Time | Type: []*float64 |
// +-----------------------------------------+------------------+
// | 2020-06-28 17:50:13.012584046 +0000 UTC | 156.304 |
// +-----------------------------------------+------------------+
expectedFrame := data.NewFrame("",
data.NewField("_start_water", nil, []time.Time{
time.Date(2020, 6, 28, 17, 50, 13, 12584046, time.UTC),
}),
data.NewField("", map[string]string{"st": "1"}, []*float64{
pointer.Float64(156.304),
}),
)
expectedFrame.Meta = &data.FrameMeta{}
if diff := cmp.Diff(expectedFrame, dr.Frames[0], data.FrameTestCompareOptions()...); diff != "" {
t.Errorf("Result mismatch (-want +got):\n%s", diff)
}
})
}
func TestBuckets(t *testing.T) { func TestBuckets(t *testing.T) {
ctx := context.Background() ctx := context.Background()
......
#group,false,false,true,true,false,true
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,double,string
#default,_result,,,,,
,result,table,_start_water,_stop_water,_value,st
,,0,2020-06-28T17:50:13.012584046Z,2020-06-29T17:50:13.012584046Z,156.304,1
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment