Commit 53175a41 by Marcus Efraimsson Committed by GitHub

Make tsdb dataframes response able to carry encoded/decoded frames (#25454)

Make tsdb dataframes response able to carry encoded and/or decoded frames
parent de1dc572
......@@ -98,7 +98,7 @@ func (tw *DatasourcePluginWrapperV2) Query(ctx context.Context, ds *models.DataS
for refID, pRes := range pbRes.Responses {
qr := &tsdb.QueryResult{
RefId: refID,
Dataframes: pRes.Frames,
Dataframes: tsdb.NewEncodedDataFrames(pRes.Frames),
}
if len(pRes.JsonMeta) != 0 {
qr.Meta = simplejson.NewFromAny(pRes.JsonMeta)
......
......@@ -111,7 +111,7 @@ func (tw *TransformWrapper) Transform(ctx context.Context, query *tsdb.TsdbQuery
for refID, res := range pbRes.Responses {
tRes := &tsdb.QueryResult{
RefId: refID,
Dataframes: res.Frames,
Dataframes: tsdb.NewEncodedDataFrames(res.Frames),
}
if len(res.JsonMeta) != 0 {
tRes.Meta = simplejson.NewFromAny(res.JsonMeta)
......@@ -190,7 +190,11 @@ func (s *transformCallback) QueryData(ctx context.Context, req *pluginv2.QueryDa
}
if res.Dataframes != nil {
pRes.Frames = res.Dataframes
encoded, err := res.Dataframes.Encoded()
if err != nil {
return nil, err
}
pRes.Frames = encoded
responses[refID] = pRes
continue
}
......
......@@ -7,7 +7,6 @@ import (
gocontext "context"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/components/null"
"github.com/grafana/grafana/pkg/components/simplejson"
......@@ -174,7 +173,7 @@ func (c *QueryCondition) executeQuery(context *alerting.EvalContext, timeRange *
useDataframes := v.Dataframes != nil && (v.Series == nil || len(v.Series) == 0)
if useDataframes { // convert the dataframes to tsdb.TimeSeries
frames, err := data.UnmarshalArrowFrames(v.Dataframes)
frames, err := v.Dataframes.Decoded()
if err != nil {
return nil, errutil.Wrap("tsdb.HandleRequest() failed to unmarshal arrow dataframes from bytes", err)
}
......
......@@ -55,7 +55,7 @@ func TestQueryCondition(t *testing.T) {
Convey("should fire when avg is above 100 on dataframe", func() {
ctx.frame = data.NewFrame("",
data.NewField("time", nil, []time.Time{time.Now()}),
data.NewField("time", nil, []time.Time{time.Now(), time.Now()}),
data.NewField("val", nil, []int64{120, 150}),
)
cr, err := ctx.exec()
......@@ -75,7 +75,7 @@ func TestQueryCondition(t *testing.T) {
Convey("Should not fire when avg is below 100 on dataframe", func() {
ctx.frame = data.NewFrame("",
data.NewField("time", nil, []time.Time{time.Now()}),
data.NewField("time", nil, []time.Time{time.Now(), time.Now()}),
data.NewField("val", nil, []int64{12, 47}),
)
cr, err := ctx.exec()
......@@ -198,12 +198,8 @@ func (ctx *queryConditionTestContext) exec() (*alerting.ConditionResult, error)
}
if ctx.frame != nil {
bFrame, err := ctx.frame.MarshalArrow()
if err != nil {
return nil, err
}
qr = &tsdb.QueryResult{
Dataframes: [][]byte{bFrame},
Dataframes: tsdb.NewDecodedDataFrames(data.Frames{ctx.frame}),
}
}
......
......@@ -179,10 +179,7 @@ func (e *AzureLogAnalyticsDatasource) executeQuery(ctx context.Context, query *A
}
}
frames := data.Frames{frame}
queryResult.Dataframes, err = frames.MarshalArrow()
if err != nil {
return queryResultError(err)
}
queryResult.Dataframes = tsdb.NewDecodedDataFrames(frames)
return queryResult
}
......
......@@ -265,6 +265,7 @@ func (e *AzureMonitorDatasource) parseResponse(queryRes *tsdb.QueryResult, amr A
return nil
}
frames := data.Frames{}
for _, series := range amr.Value[0].Timeseries {
metadataName := ""
metadataValue := ""
......@@ -303,13 +304,10 @@ func (e *AzureMonitorDatasource) parseResponse(queryRes *tsdb.QueryResult, amr A
frame.SetRow(i, point.TimeStamp, value)
}
encodedFrame, err := frame.MarshalArrow()
if err != nil {
queryRes.Error = fmt.Errorf("failed to encode dataframe response into arrow: %w", err)
frames = append(frames, frame)
}
queryRes.Dataframes = append(queryRes.Dataframes, encodedFrame)
}
queryRes.Dataframes = tsdb.NewDecodedDataFrames(frames)
return nil
}
......
......@@ -370,7 +370,7 @@ func TestAzureMonitorParseResponse(t *testing.T) {
err = datasource.parseResponse(res, azData, tt.mockQuery)
require.NoError(t, err)
frames, err := data.UnmarshalArrowFrames(res.Dataframes)
frames, err := res.Dataframes.Decoded()
require.NoError(t, err)
if diff := cmp.Diff(tt.expectedFrames, frames, data.FrameTestCompareOptions()...); diff != "" {
t.Errorf("Result mismatch (-want +got):\n%s", diff)
......
......@@ -7,6 +7,8 @@ import (
"sync"
"time"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi/resourcegroupstaggingapiiface"
......@@ -202,37 +204,23 @@ func (e *CloudWatchExecutor) executeLogAlertQuery(ctx context.Context, queryCont
return nil, err
}
encodedFrames := make([][]byte, 0)
for _, frame := range groupedFrames {
dataframeEnc, err := frame.MarshalArrow()
if err != nil {
return nil, err
}
encodedFrames = append(encodedFrames, dataframeEnc)
}
response := &tsdb.Response{
Results: make(map[string]*tsdb.QueryResult),
}
response.Results["A"] = &tsdb.QueryResult{
RefId: "A",
Dataframes: encodedFrames,
Dataframes: tsdb.NewDecodedDataFrames(groupedFrames),
}
return response, nil
}
dataframeEnc, err := dataframe.MarshalArrow()
if err != nil {
return nil, err
}
response := &tsdb.Response{
Results: map[string]*tsdb.QueryResult{
"A": {
RefId: "A",
Dataframes: [][]byte{dataframeEnc},
Dataframes: tsdb.NewDecodedDataFrames(data.Frames{dataframe}),
},
},
}
......
......@@ -41,25 +41,11 @@ func (e *CloudWatchExecutor) executeLogActions(ctx context.Context, queryContext
return err
}
encodedFrames := make([][]byte, 0)
for _, frame := range groupedFrames {
dataframeEnc, err := frame.MarshalArrow()
if err != nil {
return err
}
encodedFrames = append(encodedFrames, dataframeEnc)
}
resultChan <- &tsdb.QueryResult{RefId: query.RefId, Dataframes: encodedFrames}
resultChan <- &tsdb.QueryResult{RefId: query.RefId, Dataframes: tsdb.NewDecodedDataFrames(groupedFrames)}
return nil
}
dataframeEnc, err := dataframe.MarshalArrow()
if err != nil {
return err
}
resultChan <- &tsdb.QueryResult{RefId: query.RefId, Dataframes: [][]byte{dataframeEnc}}
resultChan <- &tsdb.QueryResult{RefId: query.RefId, Dataframes: tsdb.NewDecodedDataFrames(data.Frames{dataframe})}
return nil
})
}
......
package tsdb
import (
"encoding/json"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/components/null"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/models"
......@@ -36,7 +39,7 @@ type QueryResult struct {
Meta *simplejson.Json `json:"meta,omitempty"`
Series TimeSeriesSlice `json:"series"`
Tables []*Table `json:"tables"`
Dataframes [][]byte `json:"dataframes"`
Dataframes DataFrames `json:"dataframes"`
}
type TimeSeries struct {
......@@ -85,3 +88,80 @@ func NewTimeSeries(name string, points TimeSeriesPoints) *TimeSeries {
Points: points,
}
}
// DataFrames interface for retrieving encoded and decoded data frames.
//
// See NewDecodedDataFrames and NewEncodedDataFrames for more information.
type DataFrames interface {
// Encoded encodes Frames into a slice of []byte.
// If an error occurs [][]byte will be nil.
// The encoded result, if any, will be cached and returned next time Encoded is called.
Encoded() ([][]byte, error)
// Decoded decodes a slice of Arrow encoded frames to data.Frames ([]*data.Frame).
// If an error occurs Frames will be nil.
// The decoded result, if any, will be cached and returned next time Decoded is called.
Decoded() (data.Frames, error)
}
type dataFrames struct {
decoded data.Frames
encoded [][]byte
}
// NewDecodedDataFrames create new DataFrames from decoded frames.
//
// This should be the primary function for creating DataFrames if your implementing a plugin.
// In Grafana alerting scenario it needs to operate on decoded frames why this function is
// preferrable. When encoded data frames is needed, e.g. returned from Grafana HTTP API, it will
// happen automatically when MarshalJSON() is called.
func NewDecodedDataFrames(decodedFrames data.Frames) DataFrames {
return &dataFrames{
decoded: decodedFrames,
}
}
// NewEncodedDataFrames create new DataFrames from encoded frames.
//
// This one is primarily used for creating DataFrames when receiving encoded data frames from an external
// plugin or similar. This may allow the encoded data frames to be returned to Grafana UI without any additional
// decoding/encoding required. In Grafana alerting scenario it needs to operate on decoded data frames why encoded
// frames needs to be decoded before usage.
func NewEncodedDataFrames(encodedFrames [][]byte) DataFrames {
return &dataFrames{
encoded: encodedFrames,
}
}
func (df *dataFrames) Encoded() ([][]byte, error) {
if df.encoded == nil {
encoded, err := df.decoded.MarshalArrow()
if err != nil {
return nil, err
}
df.encoded = encoded
}
return df.encoded, nil
}
func (df *dataFrames) Decoded() (data.Frames, error) {
if df.decoded == nil {
decoded, err := data.UnmarshalArrowFrames(df.encoded)
if err != nil {
return nil, err
}
df.decoded = decoded
}
return df.decoded, nil
}
func (df *dataFrames) MarshalJSON() ([]byte, error) {
encoded, err := df.Encoded()
if err != nil {
return nil, err
}
return json.Marshal(encoded)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment