Commit e1f4287f by kay delaney Committed by GitHub

Datasource/CloudWatch: Better handling of stats grouping (#24789)

* Datasource/CloudWatch: Better handling of stats grouping
parent 44ca0527
......@@ -208,6 +208,7 @@
"@grafana/slate-react": "0.22.9-grafana",
"@reduxjs/toolkit": "1.3.4",
"@torkelo/react-select": "3.0.8",
"@types/antlr4": "^4.7.1",
"@types/braintree__sanitize-url": "4.0.0",
"@types/common-tags": "^1.8.0",
"@types/jsurl": "^1.2.28",
......@@ -221,6 +222,7 @@
"angular-native-dragdrop": "1.2.2",
"angular-route": "1.6.6",
"angular-sanitize": "1.6.6",
"antlr4": "^4.8.0",
"baron": "3.0.3",
"brace": "0.11.1",
"calculate-size": "1.1.1",
......
......@@ -4,14 +4,12 @@ import (
"context"
"fmt"
"regexp"
"strconv"
"sync"
"time"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi/resourcegroupstaggingapiiface"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models"
......@@ -137,7 +135,7 @@ func (e *CloudWatchExecutor) Query(ctx context.Context, dsInfo *models.DataSourc
*/
queryParams := queryContext.Queries[0].Model
_, fromAlert := queryContext.Headers["FromAlert"]
isLogAlertQuery := fromAlert && queryParams.Get("mode").MustString("") == "Logs"
isLogAlertQuery := fromAlert && queryParams.Get("queryMode").MustString("") == "Logs"
if isLogAlertQuery {
return e.executeLogAlertQuery(ctx, queryContext)
......@@ -192,11 +190,39 @@ func (e *CloudWatchExecutor) executeLogAlertQuery(ctx context.Context, queryCont
return nil, err
}
dataframe, err := queryResultsToDataframe(getQueryResultsOutput)
dataframe, err := logsResultsToDataframes(getQueryResultsOutput)
if err != nil {
return nil, err
}
statsGroups := queryParams.Get("statsGroups").MustStringArray()
if len(statsGroups) > 0 && len(dataframe.Fields) > 0 {
groupedFrames, err := groupResults(dataframe, statsGroups)
if err != nil {
return nil, err
}
encodedFrames := make([][]byte, 0)
for _, frame := range groupedFrames {
dataframeEnc, err := frame.MarshalArrow()
if err != nil {
return nil, err
}
encodedFrames = append(encodedFrames, dataframeEnc)
}
response := &tsdb.Response{
Results: make(map[string]*tsdb.QueryResult),
}
response.Results["A"] = &tsdb.QueryResult{
RefId: "A",
Dataframes: encodedFrames,
}
return response, nil
}
dataframeEnc, err := dataframe.MarshalArrow()
if err != nil {
return nil, err
......@@ -213,56 +239,6 @@ func (e *CloudWatchExecutor) executeLogAlertQuery(ctx context.Context, queryCont
return response, nil
}
func queryResultsToDataframe(results *cloudwatchlogs.GetQueryResultsOutput) (*data.Frame, error) {
rowCount := len(results.Results)
fieldValues := make(map[string]interface{})
for i, row := range results.Results {
for _, resultField := range row {
// Strip @ptr field from results as it's not needed
if *resultField.Field == "@ptr" {
continue
}
if _, exists := fieldValues[*resultField.Field]; !exists {
if _, err := time.Parse(cloudWatchTSFormat, *resultField.Value); err == nil {
fieldValues[*resultField.Field] = make([]*time.Time, rowCount)
} else if _, err := strconv.ParseFloat(*resultField.Value, 64); err == nil {
fieldValues[*resultField.Field] = make([]*float64, rowCount)
} else {
continue
}
}
if timeField, ok := fieldValues[*resultField.Field].([]*time.Time); ok {
parsedTime, err := time.Parse(cloudWatchTSFormat, *resultField.Value)
if err != nil {
return nil, err
}
timeField[i] = &parsedTime
} else if numericField, ok := fieldValues[*resultField.Field].([]*float64); ok {
parsedFloat, err := strconv.ParseFloat(*resultField.Value, 64)
if err != nil {
return nil, err
}
numericField[i] = &parsedFloat
}
}
}
newFields := make([]*data.Field, 0)
for fieldName, vals := range fieldValues {
newFields = append(newFields, data.NewField(fieldName, nil, vals))
if fieldName == "@timestamp" {
newFields[len(newFields)-1].SetConfig(&data.FieldConfig{Title: "Time"})
}
}
frame := data.NewFrame("CloudWatchLogsResponse", newFields...)
return frame, nil
}
func isTerminated(queryStatus string) bool {
return queryStatus == "Complete" || queryStatus == "Cancelled" || queryStatus == "Failed" || queryStatus == "Timeout"
}
......@@ -4,7 +4,6 @@ import (
"context"
"fmt"
"sort"
"strconv"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
......@@ -33,12 +32,11 @@ func (e *CloudWatchExecutor) executeLogActions(ctx context.Context, queryContext
// the query response is in, there does not seem to be a way to tell
// by the response alone if/how the results should be grouped.
// Because of this, if the frontend sees that a "stats ... by ..." query is being made
// the "groupResults" parameter is sent along with the query to the backend so that we
// the "statsGroups" parameter is sent along with the query to the backend so that we
// can correctly group the CloudWatch logs response.
if query.Model.Get("groupResults").MustBool() && len(dataframe.Fields) > 0 {
groupingFields := findGroupingFields(dataframe.Fields)
groupedFrames, err := groupResults(dataframe, groupingFields)
statsGroups := query.Model.Get("statsGroups").MustStringArray()
if len(statsGroups) > 0 && len(dataframe.Fields) > 0 {
groupedFrames, err := groupResults(dataframe, statsGroups)
if err != nil {
return err
}
......@@ -81,23 +79,6 @@ func (e *CloudWatchExecutor) executeLogActions(ctx context.Context, queryContext
return response, nil
}
func findGroupingFields(fields []*data.Field) []string {
groupingFields := make([]string, 0)
for _, field := range fields {
if field.Type().Numeric() || field.Type() == data.FieldTypeNullableTime || field.Type() == data.FieldTypeTime {
continue
}
if _, err := strconv.ParseFloat(*field.At(0).(*string), 64); err == nil {
continue
}
groupingFields = append(groupingFields, field.Name)
}
return groupingFields
}
func (e *CloudWatchExecutor) executeLogAction(ctx context.Context, queryContext *tsdb.TsdbQuery, query *tsdb.Query) (*data.Frame, error) {
parameters := query.Model
subType := query.Model.Get("subtype").MustString()
......
package cloudwatch
import (
"sort"
"strconv"
"time"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
......@@ -47,6 +49,8 @@ func logsResultsToDataframes(response *cloudwatchlogs.GetQueryResultsOutput) (*d
// Check if field is time field
if _, err := time.Parse(cloudWatchTSFormat, *resultField.Value); err == nil {
fieldValues[*resultField.Field] = make([]*time.Time, rowCount)
} else if _, err := strconv.ParseFloat(*resultField.Value, 64); err == nil {
fieldValues[*resultField.Field] = make([]*float64, rowCount)
} else {
fieldValues[*resultField.Field] = make([]*string, rowCount)
}
......@@ -59,6 +63,12 @@ func logsResultsToDataframes(response *cloudwatchlogs.GetQueryResultsOutput) (*d
}
timeField[i] = &parsedTime
} else if numericField, ok := fieldValues[*resultField.Field].([]*float64); ok {
parsedFloat, err := strconv.ParseFloat(*resultField.Value, 64)
if err != nil {
return nil, err
}
numericField[i] = &parsedFloat
} else {
fieldValues[*resultField.Field].([]*string)[i] = resultField.Value
}
......@@ -90,6 +100,8 @@ func logsResultsToDataframes(response *cloudwatchlogs.GetQueryResultsOutput) (*d
},
}
// Results aren't guaranteed to come ordered by time (ascending), so we need to sort
sort.Sort(ByTime(*frame))
return frame, nil
}
......
package cloudwatch
import (
"time"
"github.com/grafana/grafana-plugin-sdk-go/data"
)
// ByTime implements sort.Interface for data.Frame based on the frame's time field
type ByTime data.Frame
func (a ByTime) Len() int {
if len(a.Fields) > 0 {
return a.Fields[0].Len()
}
return 0
}
func (a ByTime) Swap(i, j int) {
for _, field := range a.Fields {
temp := field.At(i)
field.Set(i, field.At(j))
field.Set(j, temp)
}
}
func (a ByTime) Less(i, j int) bool {
var timeField *data.Field = nil
for _, field := range a.Fields {
if field.Type() == data.FieldTypeNullableTime {
timeField = field
break
}
}
if timeField == nil {
return false
}
return (timeField.At(i).(*time.Time)).Before(*timeField.At(j).(*time.Time))
}
package cloudwatch
import (
"sort"
"testing"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/stretchr/testify/assert"
)
func TestFrameSort(t *testing.T) {
timeA, _ := time.Parse("2006-01-02 15:04:05.000", "2020-03-02 17:04:05.000")
timeB, _ := time.Parse("2006-01-02 15:04:05.000", "2020-03-02 16:04:05.000")
timeC, _ := time.Parse("2006-01-02 15:04:05.000", "2020-03-02 15:04:05.000")
timeVals := []*time.Time{
&timeA, &timeB, &timeC,
}
timeField := data.NewField("@timestamp", nil, timeVals)
stringField := data.NewField("line", nil, []*string{
aws.String("test message 1"),
aws.String("test message 2"),
aws.String("test message 3"),
})
numberField := data.NewField("nums", nil, []*float64{
aws.Float64(20.0),
aws.Float64(50.0),
aws.Float64(17.0),
})
expectedDataframe := &data.Frame{
Name: "CloudWatchLogsResponse",
Fields: []*data.Field{
timeField,
stringField,
numberField,
},
}
sort.Sort(ByTime(*expectedDataframe))
for i := 1; i < timeField.Len(); i++ {
assert.True(t, timeField.At(i).(*time.Time).After(*(timeField.At(i - 1).(*time.Time))))
}
assert.Equal(t, *stringField.At(0).(*string), "test message 3")
assert.Equal(t, *stringField.At(1).(*string), "test message 2")
assert.Equal(t, *stringField.At(2).(*string), "test message 1")
assert.Equal(t, *numberField.At(0).(*float64), 17.0)
assert.Equal(t, *numberField.At(1).(*float64), 50.0)
assert.Equal(t, *numberField.At(2).(*float64), 20.0)
}
......@@ -33,6 +33,7 @@ import { dispatch } from 'app/store/store';
import { changeModeAction } from 'app/features/explore/state/actionTypes';
import { appEvents } from 'app/core/core';
import { InputActionMeta } from '@grafana/ui/src/components/Select/types';
import { getStatsGroups } from '../utils/query/getStatsGroups';
export interface CloudWatchLogsQueryFieldProps extends ExploreQueryFieldProps<CloudWatchDatasource, CloudWatchQuery> {
absoluteRange: AbsoluteTimeRange;
......@@ -193,6 +194,7 @@ export class CloudWatchLogsQueryField extends React.PureComponent<CloudWatchLogs
expression: value,
logGroupNames: selectedLogGroups?.map(logGroupName => logGroupName.value!) ?? [],
region: selectedRegion.value ?? 'default',
statsGroups: getStatsGroups(value),
};
onChange(nextQuery);
}
......
......@@ -17,8 +17,6 @@ import {
DataQueryResponse,
LoadingState,
toDataFrame,
guessFieldTypes,
FieldType,
LogRowModel,
} from '@grafana/data';
import { getBackendSrv, toDataQueryResponse } from '@grafana/runtime';
......@@ -137,9 +135,8 @@ export class CloudWatchDatasource extends DataSourceApi<CloudWatchQuery, CloudWa
queryId: dataFrame.fields[0].values.get(0),
region: dataFrame.meta?.custom?.['Region'] ?? 'default',
refId: dataFrame.refId!,
groupResults: this.languageProvider.isStatsQuery(
options.targets.find(target => target.refId === dataFrame.refId)!.expression
),
statsGroups: (options.targets.find(target => target.refId === dataFrame.refId)! as CloudWatchLogsQuery)
.statsGroups,
}))
)
),
......@@ -205,7 +202,7 @@ export class CloudWatchDatasource extends DataSourceApi<CloudWatchQuery, CloudWa
}
logsQuery(
queryParams: Array<{ queryId: string; refId: string; limit?: number; region: string; groupResults?: boolean }>
queryParams: Array<{ queryId: string; refId: string; limit?: number; region: string; statsGroups?: string[] }>
): Observable<DataQueryResponse> {
this.logQueries = {};
queryParams.forEach(param => {
......@@ -257,19 +254,15 @@ export class CloudWatchDatasource extends DataSourceApi<CloudWatchQuery, CloudWa
}
});
}),
map(dataFrames => {
const correctedFrames = dataFrames.map(frame => correctFrameTypes(frame));
return {
data: correctedFrames,
key: 'test-key',
state: correctedFrames.every(
dataFrame => dataFrame.meta?.custom?.['Status'] === CloudWatchLogsQueryStatus.Complete
)
? LoadingState.Done
: LoadingState.Loading,
};
})
map(dataFrames => ({
data: dataFrames,
key: 'test-key',
state: dataFrames.every(
dataFrame => dataFrame.meta?.custom?.['Status'] === CloudWatchLogsQueryStatus.Complete
)
? LoadingState.Done
: LoadingState.Loading,
}))
),
() => this.stopQueries()
);
......@@ -925,22 +918,6 @@ function withTeardown<T = any>(observable: Observable<T>, onUnsubscribe: () => v
});
}
function correctFrameTypes(frame: DataFrame): DataFrame {
frame.fields.forEach(field => {
if (field.type === FieldType.string) {
field.type = FieldType.other;
}
});
const correctedFrame = guessFieldTypes(frame);
// const timeField = correctedFrame.fields.find(field => field.name === '@timestamp');
// if (timeField) {
// timeField.type = FieldType.time;
// }
return correctedFrame;
}
function parseLogGroupName(logIdentifier: string): string {
const colonIndex = logIdentifier.lastIndexOf(':');
return logIdentifier.substr(colonIndex + 1);
......
......@@ -43,6 +43,7 @@ export interface CloudWatchLogsQuery extends DataQuery {
namespace: string;
expression: string;
logGroupNames: string[];
statsGroups?: string[];
}
export type CloudWatchQuery = CloudWatchMetricsQuery | CloudWatchLogsQuery;
......
This source diff could not be displayed because it is too large. You can view the blob instead.
import { getStatsGroups } from './getStatsGroups';
describe('GroupListener', () => {
it('should correctly parse groups in stats query', () => {
const testQueries = [
{
query:
'filter @message like /Exception/ | stats count(*) as exceptionCount by bin(1h) | sort exceptionCount desc',
expected: ['bin(1h)'],
},
{
query: `filter @type = "REPORT"
| stats max(@memorySize / 1024 / 1024) as provisonedMemoryMB,
min(@maxMemoryUsed / 1024 / 1024) as smallestMemoryRequestMB,
avg(@maxMemoryUsed / 1024 / 1024) as avgMemoryUsedMB,
max(@maxMemoryUsed / 1024 / 1024) as maxMemoryUsedMB,
provisonedMemoryMB - maxMemoryUsedMB as overProvisionedMB`,
expected: [],
},
{
query: `stats count(@message) by bin(1h), @log, @logStream as fieldAlias`,
expected: ['bin(1h)', '@log', 'fieldAlias'],
},
{
query: `stats sum(packets) as packetsTransferred by srcAddr, dstAddr
| sort packetsTransferred desc
| limit 15`,
expected: ['srcAddr', 'dstAddr'],
},
{
query: `filter isIpv4InSubnet(srcAddr, "192.0.2.0/24")
| stats sum(bytes) as bytesTransferred by dstAddr
| sort bytesTransferred desc
| limit 15`,
expected: ['dstAddr'],
},
{
query: `filter logStatus="SKIPDATA"
| stats count(*) by bin(1h) as t
| sort t
`,
expected: ['t'],
},
{
query: `stats count(*) by queryType, bin(1h)`,
expected: ['queryType', 'bin(1h)'],
},
{
query: `parse @message "user=*, method:*, latency := *" as @user,
@method, @latency | stats avg(@latency) by @method,
@user`,
expected: ['@method', '@user'],
},
{
query: 'fields @timestamp, @message | sort @timestamp desc | limit 25',
expected: [],
},
{
query: `stats count(*)`,
expected: [],
},
];
for (const { query, expected } of testQueries) {
expect(getStatsGroups(query)).toStrictEqual(expected);
}
});
});
const antlr4 = require('antlr4');
const ScrollQLLexer = require('./ScrollQLLexer').ScrollQLLexer;
const ScrollQLParser = require('./ScrollQLParser').ScrollQLParser;
const ScrollQLParserListener = require('./ScrollQLParserListener').ScrollQLParserListener;
class GroupListener extends ScrollQLParserListener {
groupNames: string[] = [];
enterLogStats(ctx: any) {
this.groupNames = [];
if (ctx.groups && ctx.groups.length > 0) {
const groups = ctx.groups;
groups.forEach((group: any) => {
// This code is for handling the case where a field specifier is aliased, with the alias available via
// the proj property. Otherwise we can just take the group text as it is.
const proj = group.fieldSpec?.().proj;
if (proj) {
this.groupNames.push(proj.getText());
} else {
this.groupNames.push(group.getText());
}
});
}
}
}
export function getStatsGroups(text: string): string[] {
// Dummy prefix needed here for parser to function correctly
const dummyPrefix = 'source test start=0 end=1|';
const queryText = dummyPrefix + text;
const chars = new antlr4.InputStream(queryText);
const lexer = new ScrollQLLexer(chars);
const tokens = new antlr4.CommonTokenStream(lexer);
const parser = new ScrollQLParser(tokens);
parser.buildParseTrees = true;
const tree = parser.query();
const groupListener = new GroupListener();
antlr4.tree.ParseTreeWalker.DEFAULT.walk(groupListener, tree);
return groupListener.groupNames;
}
......@@ -5056,6 +5056,11 @@
resolved "https://registry.yarnpkg.com/@types/angular/-/angular-1.6.56.tgz#20124077bd44061e018c7283c0bb83f4b00322dd"
integrity sha512-HxtqilvklZ7i6XOaiP7uIJIrFXEVEhfbSY45nfv2DeBRngncI58Y4ZOUMiUkcT8sqgLL1ablmbfylChUg7A3GA==
"@types/antlr4@^4.7.1":
version "4.7.1"
resolved "https://registry.yarnpkg.com/@types/antlr4/-/antlr4-4.7.1.tgz#09a8f985e29149c73e92b161d08691a1fd8425ef"
integrity sha512-mjQv+WtdJnwI5qhNh5yJkZ9rVFdRClUyaO5KebaLSJFHT6uSyDLAK9jUke4zLKZXk6vQQ/QJN2j7QV2q7l5Slw==
"@types/anymatch@*":
version "1.3.1"
resolved "https://registry.yarnpkg.com/@types/anymatch/-/anymatch-1.3.1.tgz#336badc1beecb9dacc38bea2cf32adf627a8421a"
......@@ -7050,6 +7055,11 @@ ansi-to-html@^0.6.11:
dependencies:
entities "^1.1.2"
antlr4@^4.8.0:
version "4.8.0"
resolved "https://registry.yarnpkg.com/antlr4/-/antlr4-4.8.0.tgz#f938ec171be7fc2855cd3a533e87647185b32b6a"
integrity sha512-en/MxQ4OkPgGJQ3wD/muzj1uDnFSzdFIhc2+c6bHZokWkuBb6RRvFjpWhPxWLbgQvaEzldJZ0GSQpfSAaE3hqg==
any-observable@^0.3.0:
version "0.3.0"
resolved "https://registry.yarnpkg.com/any-observable/-/any-observable-0.3.0.tgz#af933475e5806a67d0d7df090dd5e8bef65d119b"
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment