Commit 581d2eaa by Arve Knudsen Committed by GitHub

Revert "Convert CloudWatch to use dataframes (#26702)" (#27448)

This reverts commit 77628398.
parent 029278fa
...@@ -5,12 +5,10 @@ import ( ...@@ -5,12 +5,10 @@ import (
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/cloudwatch" "github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"
"github.com/grafana/grafana/pkg/infra/metrics" "github.com/grafana/grafana/pkg/infra/metrics"
) )
func (e *cloudWatchExecutor) executeRequest(ctx context.Context, client cloudwatchiface.CloudWatchAPI, func (e *cloudWatchExecutor) executeRequest(ctx context.Context, client cloudWatchClient, metricDataInput *cloudwatch.GetMetricDataInput) ([]*cloudwatch.GetMetricDataOutput, error) {
metricDataInput *cloudwatch.GetMetricDataInput) ([]*cloudwatch.GetMetricDataOutput, error) {
mdo := make([]*cloudwatch.GetMetricDataOutput, 0) mdo := make([]*cloudwatch.GetMetricDataOutput, 0)
nextToken := "" nextToken := ""
......
...@@ -7,7 +7,6 @@ import ( ...@@ -7,7 +7,6 @@ import (
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/cloudwatch" "github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
...@@ -15,7 +14,6 @@ import ( ...@@ -15,7 +14,6 @@ import (
var counter = 1 var counter = 1
type cloudWatchFakeClient struct { type cloudWatchFakeClient struct {
cloudwatchiface.CloudWatchAPI
} }
func (client *cloudWatchFakeClient) GetMetricDataWithContext(ctx aws.Context, input *cloudwatch.GetMetricDataInput, opts ...request.Option) (*cloudwatch.GetMetricDataOutput, error) { func (client *cloudWatchFakeClient) GetMetricDataWithContext(ctx aws.Context, input *cloudwatch.GetMetricDataInput, opts ...request.Option) (*cloudwatch.GetMetricDataOutput, error) {
......
...@@ -17,8 +17,8 @@ func logsResultsToDataframes(response *cloudwatchlogs.GetQueryResultsOutput) (*d ...@@ -17,8 +17,8 @@ func logsResultsToDataframes(response *cloudwatchlogs.GetQueryResultsOutput) (*d
} }
nonEmptyRows := make([][]*cloudwatchlogs.ResultField, 0) nonEmptyRows := make([][]*cloudwatchlogs.ResultField, 0)
for _, row := range response.Results {
// Sometimes CloudWatch can send empty rows // Sometimes CloudWatch can send empty rows
for _, row := range response.Results {
if len(row) == 0 { if len(row) == 0 {
continue continue
} }
...@@ -26,7 +26,7 @@ func logsResultsToDataframes(response *cloudwatchlogs.GetQueryResultsOutput) (*d ...@@ -26,7 +26,7 @@ func logsResultsToDataframes(response *cloudwatchlogs.GetQueryResultsOutput) (*d
if row[0].Value == nil { if row[0].Value == nil {
continue continue
} }
// Sometimes it sends rows with only timestamp // Sometimes it sends row with only timestamp
if _, err := time.Parse(cloudWatchTSFormat, *row[0].Value); err == nil { if _, err := time.Parse(cloudWatchTSFormat, *row[0].Value); err == nil {
continue continue
} }
...@@ -52,7 +52,7 @@ func logsResultsToDataframes(response *cloudwatchlogs.GetQueryResultsOutput) (*d ...@@ -52,7 +52,7 @@ func logsResultsToDataframes(response *cloudwatchlogs.GetQueryResultsOutput) (*d
if _, exists := fieldValues[*resultField.Field]; !exists { if _, exists := fieldValues[*resultField.Field]; !exists {
fieldNames = append(fieldNames, *resultField.Field) fieldNames = append(fieldNames, *resultField.Field)
// Check if it's a time field // Check if field is time field
if _, err := time.Parse(cloudWatchTSFormat, *resultField.Value); err == nil { if _, err := time.Parse(cloudWatchTSFormat, *resultField.Value); err == nil {
fieldValues[*resultField.Field] = make([]*time.Time, rowCount) fieldValues[*resultField.Field] = make([]*time.Time, rowCount)
} else if _, err := strconv.ParseFloat(*resultField.Value, 64); err == nil { } else if _, err := strconv.ParseFloat(*resultField.Value, 64); err == nil {
...@@ -81,7 +81,7 @@ func logsResultsToDataframes(response *cloudwatchlogs.GetQueryResultsOutput) (*d ...@@ -81,7 +81,7 @@ func logsResultsToDataframes(response *cloudwatchlogs.GetQueryResultsOutput) (*d
} }
} }
newFields := make([]*data.Field, 0, len(fieldNames)) newFields := make([]*data.Field, 0)
for _, fieldName := range fieldNames { for _, fieldName := range fieldNames {
newFields = append(newFields, data.NewField(fieldName, nil, fieldValues[fieldName])) newFields = append(newFields, data.NewField(fieldName, nil, fieldValues[fieldName]))
......
...@@ -5,7 +5,6 @@ import ( ...@@ -5,7 +5,6 @@ import (
"sort" "sort"
"strings" "strings"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/tsdb" "github.com/grafana/grafana/pkg/tsdb"
) )
...@@ -14,9 +13,7 @@ import ( ...@@ -14,9 +13,7 @@ import (
// has more than one statistic defined, one cloudwatchQuery will be created for each statistic. // has more than one statistic defined, one cloudwatchQuery will be created for each statistic.
// If the query doesn't have an Id defined by the user, we'll give it an with format `query[RefId]`. In the case // If the query doesn't have an Id defined by the user, we'll give it an with format `query[RefId]`. In the case
// the incoming query had more than one stat, it will ge an id like `query[RefId]_[StatName]`, eg queryC_Average // the incoming query had more than one stat, it will ge an id like `query[RefId]_[StatName]`, eg queryC_Average
func (e *cloudWatchExecutor) transformRequestQueriesToCloudWatchQueries(requestQueries []*requestQuery) ( func (e *cloudWatchExecutor) transformRequestQueriesToCloudWatchQueries(requestQueries []*requestQuery) (map[string]*cloudWatchQuery, error) {
map[string]*cloudWatchQuery, error) {
plog.Debug("Transforming CloudWatch request queries")
cloudwatchQueries := make(map[string]*cloudWatchQuery) cloudwatchQueries := make(map[string]*cloudWatchQuery)
for _, requestQuery := range requestQueries { for _, requestQuery := range requestQueries {
for _, stat := range requestQuery.Statistics { for _, stat := range requestQuery.Statistics {
...@@ -55,22 +52,17 @@ func (e *cloudWatchExecutor) transformRequestQueriesToCloudWatchQueries(requestQ ...@@ -55,22 +52,17 @@ func (e *cloudWatchExecutor) transformRequestQueriesToCloudWatchQueries(requestQ
func (e *cloudWatchExecutor) transformQueryResponseToQueryResult(cloudwatchResponses []*cloudwatchResponse) map[string]*tsdb.QueryResult { func (e *cloudWatchExecutor) transformQueryResponseToQueryResult(cloudwatchResponses []*cloudwatchResponse) map[string]*tsdb.QueryResult {
responsesByRefID := make(map[string][]*cloudwatchResponse) responsesByRefID := make(map[string][]*cloudwatchResponse)
refIDs := sort.StringSlice{}
for _, res := range cloudwatchResponses { for _, res := range cloudwatchResponses {
refIDs = append(refIDs, res.RefId)
responsesByRefID[res.RefId] = append(responsesByRefID[res.RefId], res) responsesByRefID[res.RefId] = append(responsesByRefID[res.RefId], res)
} }
// Ensure stable results
refIDs.Sort()
results := make(map[string]*tsdb.QueryResult) results := make(map[string]*tsdb.QueryResult)
for _, refID := range refIDs { for refID, responses := range responsesByRefID {
responses := responsesByRefID[refID]
queryResult := tsdb.NewQueryResult() queryResult := tsdb.NewQueryResult()
queryResult.RefId = refID queryResult.RefId = refID
queryResult.Meta = simplejson.New() queryResult.Meta = simplejson.New()
queryResult.Series = tsdb.TimeSeriesSlice{} queryResult.Series = tsdb.TimeSeriesSlice{}
frames := make(data.Frames, 0, len(responses)) timeSeries := make(tsdb.TimeSeriesSlice, 0)
requestExceededMaxLimit := false requestExceededMaxLimit := false
partialData := false partialData := false
...@@ -80,7 +72,7 @@ func (e *cloudWatchExecutor) transformQueryResponseToQueryResult(cloudwatchRespo ...@@ -80,7 +72,7 @@ func (e *cloudWatchExecutor) transformQueryResponseToQueryResult(cloudwatchRespo
}{} }{}
for _, response := range responses { for _, response := range responses {
frames = append(frames, response.DataFrames...) timeSeries = append(timeSeries, *response.series...)
requestExceededMaxLimit = requestExceededMaxLimit || response.RequestExceededMaxLimit requestExceededMaxLimit = requestExceededMaxLimit || response.RequestExceededMaxLimit
partialData = partialData || response.PartialData partialData = partialData || response.PartialData
queryMeta = append(queryMeta, struct { queryMeta = append(queryMeta, struct {
...@@ -93,8 +85,8 @@ func (e *cloudWatchExecutor) transformQueryResponseToQueryResult(cloudwatchRespo ...@@ -93,8 +85,8 @@ func (e *cloudWatchExecutor) transformQueryResponseToQueryResult(cloudwatchRespo
}) })
} }
sort.Slice(frames, func(i, j int) bool { sort.Slice(timeSeries, func(i, j int) bool {
return frames[i].Name < frames[j].Name return timeSeries[i].Name < timeSeries[j].Name
}) })
if requestExceededMaxLimit { if requestExceededMaxLimit {
...@@ -104,7 +96,7 @@ func (e *cloudWatchExecutor) transformQueryResponseToQueryResult(cloudwatchRespo ...@@ -104,7 +96,7 @@ func (e *cloudWatchExecutor) transformQueryResponseToQueryResult(cloudwatchRespo
queryResult.ErrorString = "Cloudwatch GetMetricData error: Too many datapoints requested - your search has been limited. Please try to reduce the time range" queryResult.ErrorString = "Cloudwatch GetMetricData error: Too many datapoints requested - your search has been limited. Please try to reduce the time range"
} }
queryResult.Dataframes = tsdb.NewDecodedDataFrames(frames) queryResult.Series = append(queryResult.Series, timeSeries...)
queryResult.Meta.Set("gmdMeta", queryMeta) queryResult.Meta.Set("gmdMeta", queryMeta)
results[refID] = queryResult results[refID] = queryResult
} }
......
...@@ -17,13 +17,13 @@ import ( ...@@ -17,13 +17,13 @@ import (
// Parses the json queries and returns a requestQuery. The requestQuery has a 1 to 1 mapping to a query editor row // Parses the json queries and returns a requestQuery. The requestQuery has a 1 to 1 mapping to a query editor row
func (e *cloudWatchExecutor) parseQueries(queryContext *tsdb.TsdbQuery, startTime time.Time, endTime time.Time) (map[string][]*requestQuery, error) { func (e *cloudWatchExecutor) parseQueries(queryContext *tsdb.TsdbQuery, startTime time.Time, endTime time.Time) (map[string][]*requestQuery, error) {
requestQueries := make(map[string][]*requestQuery) requestQueries := make(map[string][]*requestQuery)
for i, query := range queryContext.Queries { for i, model := range queryContext.Queries {
queryType := query.Model.Get("type").MustString() queryType := model.Model.Get("type").MustString()
if queryType != "timeSeriesQuery" && queryType != "" { if queryType != "timeSeriesQuery" && queryType != "" {
continue continue
} }
refID := query.RefId refID := queryContext.Queries[i].RefId
query, err := parseRequestQuery(queryContext.Queries[i].Model, refID, startTime, endTime) query, err := parseRequestQuery(queryContext.Queries[i].Model, refID, startTime, endTime)
if err != nil { if err != nil {
return nil, &queryError{err: err, RefID: refID} return nil, &queryError{err: err, RefID: refID}
...@@ -39,7 +39,6 @@ func (e *cloudWatchExecutor) parseQueries(queryContext *tsdb.TsdbQuery, startTim ...@@ -39,7 +39,6 @@ func (e *cloudWatchExecutor) parseQueries(queryContext *tsdb.TsdbQuery, startTim
} }
func parseRequestQuery(model *simplejson.Json, refId string, startTime time.Time, endTime time.Time) (*requestQuery, error) { func parseRequestQuery(model *simplejson.Json, refId string, startTime time.Time, endTime time.Time) (*requestQuery, error) {
plog.Debug("Parsing request query", "query", model)
reNumber := regexp.MustCompile(`^\d+$`) reNumber := regexp.MustCompile(`^\d+$`)
region, err := model.Get("region").String() region, err := model.Get("region").String()
if err != nil { if err != nil {
......
...@@ -8,12 +8,11 @@ import ( ...@@ -8,12 +8,11 @@ import (
"time" "time"
"github.com/aws/aws-sdk-go/service/cloudwatch" "github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana/pkg/components/null"
"github.com/grafana/grafana/pkg/tsdb"
) )
func (e *cloudWatchExecutor) parseResponse(metricDataOutputs []*cloudwatch.GetMetricDataOutput, func (e *cloudWatchExecutor) parseResponse(metricDataOutputs []*cloudwatch.GetMetricDataOutput, queries map[string]*cloudWatchQuery) ([]*cloudwatchResponse, error) {
queries map[string]*cloudWatchQuery) ([]*cloudwatchResponse, error) {
plog.Debug("Parsing metric data output", "queries", queries)
// Map from result ID -> label -> result // Map from result ID -> label -> result
mdrs := make(map[string]map[string]*cloudwatch.MetricDataResult) mdrs := make(map[string]map[string]*cloudwatch.MetricDataResult)
labels := map[string][]string{} labels := map[string][]string{}
...@@ -49,15 +48,14 @@ func (e *cloudWatchExecutor) parseResponse(metricDataOutputs []*cloudwatch.GetMe ...@@ -49,15 +48,14 @@ func (e *cloudWatchExecutor) parseResponse(metricDataOutputs []*cloudwatch.GetMe
cloudWatchResponses := make([]*cloudwatchResponse, 0) cloudWatchResponses := make([]*cloudwatchResponse, 0)
for id, lr := range mdrs { for id, lr := range mdrs {
plog.Debug("Handling metric data results", "id", id, "lr", lr)
query := queries[id] query := queries[id]
frames, partialData, err := parseGetMetricDataTimeSeries(lr, labels[id], query) series, partialData, err := parseGetMetricDataTimeSeries(lr, labels[id], query)
if err != nil { if err != nil {
return nil, err return nil, err
} }
response := &cloudwatchResponse{ response := &cloudwatchResponse{
DataFrames: frames, series: series,
Period: query.Period, Period: query.Period,
Expression: query.UsedExpression, Expression: query.UsedExpression,
RefId: query.RefId, RefId: query.RefId,
...@@ -72,15 +70,12 @@ func (e *cloudWatchExecutor) parseResponse(metricDataOutputs []*cloudwatch.GetMe ...@@ -72,15 +70,12 @@ func (e *cloudWatchExecutor) parseResponse(metricDataOutputs []*cloudwatch.GetMe
} }
func parseGetMetricDataTimeSeries(metricDataResults map[string]*cloudwatch.MetricDataResult, labels []string, func parseGetMetricDataTimeSeries(metricDataResults map[string]*cloudwatch.MetricDataResult, labels []string,
query *cloudWatchQuery) (data.Frames, bool, error) { query *cloudWatchQuery) (*tsdb.TimeSeriesSlice, bool, error) {
plog.Debug("Parsing metric data results", "results", metricDataResults)
partialData := false partialData := false
frames := data.Frames{} result := tsdb.TimeSeriesSlice{}
for _, label := range labels { for _, label := range labels {
metricDataResult := metricDataResults[label] metricDataResult := metricDataResults[label]
plog.Debug("Processing metric data result", "label", label, "statusCode", metricDataResult.StatusCode)
if *metricDataResult.StatusCode != "Complete" { if *metricDataResult.StatusCode != "Complete" {
plog.Debug("Handling a partial result")
partialData = true partialData = true
} }
...@@ -90,8 +85,8 @@ func parseGetMetricDataTimeSeries(metricDataResults map[string]*cloudwatch.Metri ...@@ -90,8 +85,8 @@ func parseGetMetricDataTimeSeries(metricDataResults map[string]*cloudwatch.Metri
} }
} }
// In case a multi-valued dimension is used and the cloudwatch query yields no values, create one empty time // In case a multi-valued dimension is used and the cloudwatch query yields no values, create one empty time series for each dimension value.
// series for each dimension value. Use that dimension value to expand the alias field // Use that dimension value to expand the alias field
if len(metricDataResult.Values) == 0 && query.isMultiValuedDimensionExpression() { if len(metricDataResult.Values) == 0 && query.isMultiValuedDimensionExpression() {
series := 0 series := 0
multiValuedDimension := "" multiValuedDimension := ""
...@@ -103,78 +98,62 @@ func parseGetMetricDataTimeSeries(metricDataResults map[string]*cloudwatch.Metri ...@@ -103,78 +98,62 @@ func parseGetMetricDataTimeSeries(metricDataResults map[string]*cloudwatch.Metri
} }
for _, value := range query.Dimensions[multiValuedDimension] { for _, value := range query.Dimensions[multiValuedDimension] {
tags := map[string]string{multiValuedDimension: value} emptySeries := tsdb.TimeSeries{
Tags: map[string]string{multiValuedDimension: value},
Points: make([]tsdb.TimePoint, 0),
}
for key, values := range query.Dimensions { for key, values := range query.Dimensions {
if key != multiValuedDimension && len(values) > 0 { if key != multiValuedDimension && len(values) > 0 {
tags[key] = values[0] emptySeries.Tags[key] = values[0]
} }
} }
emptyFrame := data.Frame{ emptySeries.Name = formatAlias(query, query.Stats, emptySeries.Tags, label)
Name: formatAlias(query, query.Stats, tags, label), result = append(result, &emptySeries)
Fields: []*data.Field{
data.NewField("timestamp", nil, []float64{}),
data.NewField("value", tags, []*float64{}),
},
}
frames = append(frames, &emptyFrame)
} }
} else { } else {
dims := make([]string, 0, len(query.Dimensions)) keys := make([]string, 0)
for k := range query.Dimensions { for k := range query.Dimensions {
dims = append(dims, k) keys = append(keys, k)
} }
sort.Strings(dims) sort.Strings(keys)
tags := data.Labels{} series := tsdb.TimeSeries{
for _, dim := range dims { Tags: make(map[string]string),
plog.Debug("Handling dimension", "dimension", dim) Points: make([]tsdb.TimePoint, 0),
values := query.Dimensions[dim] }
for _, key := range keys {
values := query.Dimensions[key]
if len(values) == 1 && values[0] != "*" { if len(values) == 1 && values[0] != "*" {
plog.Debug("Got a tag value", "tag", dim, "value", values[0]) series.Tags[key] = values[0]
tags[dim] = values[0]
} else { } else {
for _, value := range values { for _, value := range values {
if value == label || value == "*" { if value == label || value == "*" {
plog.Debug("Got a tag value", "tag", dim, "value", value, "label", label) series.Tags[key] = label
tags[dim] = label
} else if strings.Contains(label, value) { } else if strings.Contains(label, value) {
plog.Debug("Got a tag value", "tag", dim, "value", value, "label", label) series.Tags[key] = value
tags[dim] = value
} }
} }
} }
} }
timestamps := []float64{} series.Name = formatAlias(query, query.Stats, series.Tags, label)
points := []*float64{}
for j, t := range metricDataResult.Timestamps { for j, t := range metricDataResult.Timestamps {
if j > 0 { if j > 0 {
expectedTimestamp := metricDataResult.Timestamps[j-1].Add(time.Duration(query.Period) * time.Second) expectedTimestamp := metricDataResult.Timestamps[j-1].Add(time.Duration(query.Period) * time.Second)
if expectedTimestamp.Before(*t) { if expectedTimestamp.Before(*t) {
timestamps = append(timestamps, float64(expectedTimestamp.Unix()*1000)) series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFromPtr(nil), float64(expectedTimestamp.Unix()*1000)))
points = append(points, nil)
} }
} }
val := metricDataResult.Values[j] series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFrom(*metricDataResult.Values[j]),
plog.Debug("Handling timestamp", "timestamp", t, "value", *val) float64(t.Unix())*1000))
timestamps = append(timestamps, float64(t.Unix()*1000))
points = append(points, val)
} }
result = append(result, &series)
fields := []*data.Field{
data.NewField("timestamp", nil, timestamps),
data.NewField("value", tags, points),
}
frame := data.Frame{
Name: formatAlias(query, query.Stats, tags, label),
Fields: fields,
} }
frames = append(frames, &frame)
} }
} return &result, partialData, nil
return frames, partialData, nil
} }
func formatAlias(query *cloudWatchQuery, stat string, dimensions map[string]string, label string) string { func formatAlias(query *cloudWatchQuery, stat string, dimensions map[string]string, label string) string {
......
...@@ -6,6 +6,7 @@ import ( ...@@ -6,6 +6,7 @@ import (
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/cloudwatch" "github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/grafana/grafana/pkg/components/null"
. "github.com/smartystreets/goconvey/convey" . "github.com/smartystreets/goconvey/convey"
) )
...@@ -60,17 +61,17 @@ func TestCloudWatchResponseParser(t *testing.T) { ...@@ -60,17 +61,17 @@ func TestCloudWatchResponseParser(t *testing.T) {
Period: 60, Period: 60,
Alias: "{{LoadBalancer}} Expanded", Alias: "{{LoadBalancer}} Expanded",
} }
frames, partialData, err := parseGetMetricDataTimeSeries(mdrs, labels, query) series, partialData, err := parseGetMetricDataTimeSeries(mdrs, labels, query)
So(err, ShouldBeNil) timeSeries := (*series)[0]
frame1 := frames[0] So(err, ShouldBeNil)
So(partialData, ShouldBeFalse) So(partialData, ShouldBeFalse)
So(frame1.Name, ShouldEqual, "lb1 Expanded") So(timeSeries.Name, ShouldEqual, "lb1 Expanded")
So(frame1.Fields[1].Labels["LoadBalancer"], ShouldEqual, "lb1") So(timeSeries.Tags["LoadBalancer"], ShouldEqual, "lb1")
frame2 := frames[1] timeSeries2 := (*series)[1]
So(frame2.Name, ShouldEqual, "lb2 Expanded") So(timeSeries2.Name, ShouldEqual, "lb2 Expanded")
So(frame2.Fields[1].Labels["LoadBalancer"], ShouldEqual, "lb2") So(timeSeries2.Tags["LoadBalancer"], ShouldEqual, "lb2")
}) })
Convey("can expand dimension value using substring", func() { Convey("can expand dimension value using substring", func() {
...@@ -122,17 +123,16 @@ func TestCloudWatchResponseParser(t *testing.T) { ...@@ -122,17 +123,16 @@ func TestCloudWatchResponseParser(t *testing.T) {
Period: 60, Period: 60,
Alias: "{{LoadBalancer}} Expanded", Alias: "{{LoadBalancer}} Expanded",
} }
frames, partialData, err := parseGetMetricDataTimeSeries(mdrs, labels, query) series, partialData, err := parseGetMetricDataTimeSeries(mdrs, labels, query)
timeSeries := (*series)[0]
So(err, ShouldBeNil) So(err, ShouldBeNil)
frame1 := frames[0]
So(partialData, ShouldBeFalse) So(partialData, ShouldBeFalse)
So(frame1.Name, ShouldEqual, "lb1 Expanded") So(timeSeries.Name, ShouldEqual, "lb1 Expanded")
So(frame1.Fields[1].Labels["LoadBalancer"], ShouldEqual, "lb1") So(timeSeries.Tags["LoadBalancer"], ShouldEqual, "lb1")
frame2 := frames[1] timeSeries2 := (*series)[1]
So(frame2.Name, ShouldEqual, "lb2 Expanded") So(timeSeries2.Name, ShouldEqual, "lb2 Expanded")
So(frame2.Fields[1].Labels["LoadBalancer"], ShouldEqual, "lb2") So(timeSeries2.Tags["LoadBalancer"], ShouldEqual, "lb2")
}) })
Convey("can expand dimension value using wildcard", func() { Convey("can expand dimension value using wildcard", func() {
...@@ -184,12 +184,12 @@ func TestCloudWatchResponseParser(t *testing.T) { ...@@ -184,12 +184,12 @@ func TestCloudWatchResponseParser(t *testing.T) {
Period: 60, Period: 60,
Alias: "{{LoadBalancer}} Expanded", Alias: "{{LoadBalancer}} Expanded",
} }
frames, partialData, err := parseGetMetricDataTimeSeries(mdrs, labels, query) series, partialData, err := parseGetMetricDataTimeSeries(mdrs, labels, query)
So(err, ShouldBeNil)
So(err, ShouldBeNil)
So(partialData, ShouldBeFalse) So(partialData, ShouldBeFalse)
So(frames[0].Name, ShouldEqual, "lb3 Expanded") So((*series)[0].Name, ShouldEqual, "lb3 Expanded")
So(frames[1].Name, ShouldEqual, "lb4 Expanded") So((*series)[1].Name, ShouldEqual, "lb4 Expanded")
}) })
Convey("can expand dimension value when no values are returned and a multi-valued template variable is used", func() { Convey("can expand dimension value when no values are returned and a multi-valued template variable is used", func() {
...@@ -221,13 +221,13 @@ func TestCloudWatchResponseParser(t *testing.T) { ...@@ -221,13 +221,13 @@ func TestCloudWatchResponseParser(t *testing.T) {
Period: 60, Period: 60,
Alias: "{{LoadBalancer}} Expanded", Alias: "{{LoadBalancer}} Expanded",
} }
frames, partialData, err := parseGetMetricDataTimeSeries(mdrs, labels, query) series, partialData, err := parseGetMetricDataTimeSeries(mdrs, labels, query)
So(err, ShouldBeNil)
So(err, ShouldBeNil)
So(partialData, ShouldBeFalse) So(partialData, ShouldBeFalse)
So(len(frames), ShouldEqual, 2) So(len(*series), ShouldEqual, 2)
So(frames[0].Name, ShouldEqual, "lb1 Expanded") So((*series)[0].Name, ShouldEqual, "lb1 Expanded")
So(frames[1].Name, ShouldEqual, "lb2 Expanded") So((*series)[1].Name, ShouldEqual, "lb2 Expanded")
}) })
Convey("can expand dimension value when no values are returned and a multi-valued template variable and two single-valued dimensions are used", func() { Convey("can expand dimension value when no values are returned and a multi-valued template variable and two single-valued dimensions are used", func() {
...@@ -261,13 +261,13 @@ func TestCloudWatchResponseParser(t *testing.T) { ...@@ -261,13 +261,13 @@ func TestCloudWatchResponseParser(t *testing.T) {
Period: 60, Period: 60,
Alias: "{{LoadBalancer}} Expanded {{InstanceType}} - {{Resource}}", Alias: "{{LoadBalancer}} Expanded {{InstanceType}} - {{Resource}}",
} }
frames, partialData, err := parseGetMetricDataTimeSeries(mdrs, labels, query) series, partialData, err := parseGetMetricDataTimeSeries(mdrs, labels, query)
So(err, ShouldBeNil)
So(err, ShouldBeNil)
So(partialData, ShouldBeFalse) So(partialData, ShouldBeFalse)
So(len(frames), ShouldEqual, 2) So(len(*series), ShouldEqual, 2)
So(frames[0].Name, ShouldEqual, "lb1 Expanded micro - res") So((*series)[0].Name, ShouldEqual, "lb1 Expanded micro - res")
So(frames[1].Name, ShouldEqual, "lb2 Expanded micro - res") So((*series)[1].Name, ShouldEqual, "lb2 Expanded micro - res")
}) })
Convey("can parse cloudwatch response", func() { Convey("can parse cloudwatch response", func() {
...@@ -304,18 +304,17 @@ func TestCloudWatchResponseParser(t *testing.T) { ...@@ -304,18 +304,17 @@ func TestCloudWatchResponseParser(t *testing.T) {
Period: 60, Period: 60,
Alias: "{{namespace}}_{{metric}}_{{stat}}", Alias: "{{namespace}}_{{metric}}_{{stat}}",
} }
frames, partialData, err := parseGetMetricDataTimeSeries(mdrs, labels, query) series, partialData, err := parseGetMetricDataTimeSeries(mdrs, labels, query)
So(err, ShouldBeNil) timeSeries := (*series)[0]
frame := frames[0] So(err, ShouldBeNil)
So(partialData, ShouldBeFalse) So(partialData, ShouldBeFalse)
So(frame.Name, ShouldEqual, "AWS/ApplicationELB_TargetResponseTime_Average") So(timeSeries.Name, ShouldEqual, "AWS/ApplicationELB_TargetResponseTime_Average")
So(frame.Fields[1].Labels["LoadBalancer"], ShouldEqual, "lb") So(timeSeries.Tags["LoadBalancer"], ShouldEqual, "lb")
So(frame.Fields[1].Len(), ShouldEqual, 4) So(timeSeries.Points[0][0].String(), ShouldEqual, null.FloatFrom(10.0).String())
So(*frame.Fields[1].At(0).(*float64), ShouldEqual, 10.0) So(timeSeries.Points[1][0].String(), ShouldEqual, null.FloatFrom(20.0).String())
So(*frame.Fields[1].At(1).(*float64), ShouldEqual, 20.0) So(timeSeries.Points[2][0].String(), ShouldEqual, null.FloatFromPtr(nil).String())
So(frame.Fields[1].At(2).(*float64), ShouldBeNil) So(timeSeries.Points[3][0].String(), ShouldEqual, null.FloatFrom(30.0).String())
So(*frame.Fields[1].At(3).(*float64), ShouldEqual, 30.0)
}) })
}) })
} }
...@@ -6,19 +6,17 @@ import ( ...@@ -6,19 +6,17 @@ import (
"github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/tsdb" "github.com/grafana/grafana/pkg/tsdb"
"github.com/grafana/grafana/pkg/util/errutil"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
) )
func (e *cloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, queryContext *tsdb.TsdbQuery) (*tsdb.Response, error) { func (e *cloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, queryContext *tsdb.TsdbQuery) (*tsdb.Response, error) {
plog.Debug("Executing time series query")
startTime, err := queryContext.TimeRange.ParseFrom() startTime, err := queryContext.TimeRange.ParseFrom()
if err != nil { if err != nil {
return nil, errutil.Wrap("failed to parse start time", err) return nil, err
} }
endTime, err := queryContext.TimeRange.ParseTo() endTime, err := queryContext.TimeRange.ParseTo()
if err != nil { if err != nil {
return nil, errutil.Wrap("failed to parse end time", err) return nil, err
} }
if !startTime.Before(endTime) { if !startTime.Before(endTime) {
return nil, fmt.Errorf("invalid time range: start time must be before end time") return nil, fmt.Errorf("invalid time range: start time must be before end time")
......
...@@ -3,9 +3,16 @@ package cloudwatch ...@@ -3,9 +3,16 @@ package cloudwatch
import ( import (
"fmt" "fmt"
"github.com/grafana/grafana-plugin-sdk-go/data" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/grafana/grafana/pkg/tsdb"
) )
type cloudWatchClient interface {
GetMetricDataWithContext(ctx aws.Context, input *cloudwatch.GetMetricDataInput, opts ...request.Option) (*cloudwatch.GetMetricDataOutput, error)
}
type requestQuery struct { type requestQuery struct {
RefId string RefId string
Region string Region string
...@@ -24,7 +31,7 @@ type requestQuery struct { ...@@ -24,7 +31,7 @@ type requestQuery struct {
} }
type cloudwatchResponse struct { type cloudwatchResponse struct {
DataFrames data.Frames series *tsdb.TimeSeriesSlice
Id string Id string
RefId string RefId string
Expression string Expression string
......
...@@ -89,7 +89,7 @@ func NewTimeSeries(name string, points TimeSeriesPoints) *TimeSeries { ...@@ -89,7 +89,7 @@ func NewTimeSeries(name string, points TimeSeriesPoints) *TimeSeries {
} }
} }
// DataFrames is an interface for retrieving encoded and decoded data frames. // DataFrames interface for retrieving encoded and decoded data frames.
// //
// See NewDecodedDataFrames and NewEncodedDataFrames for more information. // See NewDecodedDataFrames and NewEncodedDataFrames for more information.
type DataFrames interface { type DataFrames interface {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment