Commit c863fd3d by Arve Knudsen Committed by GitHub

CloudWatch: Clean up code (#26259)

* CloudWatch: Clean up code
parent 747bdaf2
......@@ -12,7 +12,7 @@ import (
"github.com/grafana/grafana/pkg/util/errutil"
)
func (e *CloudWatchExecutor) executeAnnotationQuery(ctx context.Context, queryContext *tsdb.TsdbQuery) (*tsdb.Response, error) {
func (e *cloudWatchExecutor) executeAnnotationQuery(ctx context.Context, queryContext *tsdb.TsdbQuery) (*tsdb.Response, error) {
result := &tsdb.Response{
Results: make(map[string]*tsdb.QueryResult),
}
......@@ -36,7 +36,7 @@ func (e *CloudWatchExecutor) executeAnnotationQuery(ctx context.Context, queryCo
actionPrefix := parameters.Get("actionPrefix").MustString("")
alarmNamePrefix := parameters.Get("alarmNamePrefix").MustString("")
svc, err := e.getClient(region)
cli, err := e.getCWClient(region)
if err != nil {
return nil, err
}
......@@ -48,7 +48,7 @@ func (e *CloudWatchExecutor) executeAnnotationQuery(ctx context.Context, queryCo
ActionPrefix: aws.String(actionPrefix),
AlarmNamePrefix: aws.String(alarmNamePrefix),
}
resp, err := svc.DescribeAlarms(params)
resp, err := cli.DescribeAlarms(params)
if err != nil {
return nil, errutil.Wrap("failed to call cloudwatch:DescribeAlarms", err)
}
......@@ -79,7 +79,7 @@ func (e *CloudWatchExecutor) executeAnnotationQuery(ctx context.Context, queryCo
Statistic: aws.String(s),
Period: aws.Int64(period),
}
resp, err := svc.DescribeAlarmsForMetric(params)
resp, err := cli.DescribeAlarmsForMetric(params)
if err != nil {
return nil, errutil.Wrap("failed to call cloudwatch:DescribeAlarmsForMetric", err)
}
......@@ -106,7 +106,7 @@ func (e *CloudWatchExecutor) executeAnnotationQuery(ctx context.Context, queryCo
EndDate: aws.Time(endTime),
MaxRecords: aws.Int64(100),
}
resp, err := svc.DescribeAlarmHistory(params)
resp, err := cli.DescribeAlarmHistory(params)
if err != nil {
return nil, errutil.Wrap("failed to call cloudwatch:DescribeAlarmHistory", err)
}
......
......@@ -9,25 +9,19 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi/resourcegroupstaggingapiiface"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/tsdb"
)
type CloudWatchExecutor struct {
*models.DataSource
ec2Svc ec2iface.EC2API
rgtaSvc resourcegroupstaggingapiiface.ResourceGroupsTaggingAPIAPI
logsClientsByRegion map[string](*cloudwatchlogs.CloudWatchLogs)
mux sync.Mutex
}
type DatasourceInfo struct {
type datasourceInfo struct {
Profile string
Region string
AuthType string
......@@ -40,56 +34,88 @@ type DatasourceInfo struct {
}
const cloudWatchTSFormat = "2006-01-02 15:04:05.000"
const defaultRegion = "default"
// Constants also defined in datasource/cloudwatch/datasource.ts
const logIdentifierInternal = "__log__grafana_internal__"
const logStreamIdentifierInternal = "__logstream__grafana_internal__"
func (e *CloudWatchExecutor) getLogsClient(region string) (*cloudwatchlogs.CloudWatchLogs, error) {
e.mux.Lock()
defer e.mux.Unlock()
var plog = log.New("tsdb.cloudwatch")
var aliasFormat = regexp.MustCompile(`\{\{\s*(.+?)\s*\}\}`)
if logsClient, ok := e.logsClientsByRegion[region]; ok {
return logsClient, nil
}
func init() {
tsdb.RegisterTsdbQueryEndpoint("cloudwatch", newcloudWatchExecutor)
}
dsInfo := retrieveDsInfo(e.DataSource, region)
newLogsClient, err := retrieveLogsClient(dsInfo)
func newcloudWatchExecutor(datasource *models.DataSource) (tsdb.TsdbQueryEndpoint, error) {
e := &cloudWatchExecutor{
DataSource: datasource,
}
dsInfo := e.getDSInfo(defaultRegion)
defaultLogsClient, err := retrieveLogsClient(dsInfo)
if err != nil {
return nil, err
}
e.logsClientsByRegion = map[string]*cloudwatchlogs.CloudWatchLogs{
dsInfo.Region: defaultLogsClient,
defaultRegion: defaultLogsClient,
}
e.logsClientsByRegion[region] = newLogsClient
return e, nil
}
return newLogsClient, nil
// cloudWatchExecutor executes CloudWatch requests.
type cloudWatchExecutor struct {
*models.DataSource
ec2Svc ec2iface.EC2API
rgtaSvc resourcegroupstaggingapiiface.ResourceGroupsTaggingAPIAPI
logsClientsByRegion map[string](*cloudwatchlogs.CloudWatchLogs)
mux sync.Mutex
}
func NewCloudWatchExecutor(datasource *models.DataSource) (tsdb.TsdbQueryEndpoint, error) {
dsInfo := retrieveDsInfo(datasource, "default")
defaultLogsClient, err := retrieveLogsClient(dsInfo)
func (e *cloudWatchExecutor) getCWClient(region string) (*cloudwatch.CloudWatch, error) {
datasourceInfo := e.getDSInfo(region)
cfg, err := getAwsConfig(datasourceInfo)
if err != nil {
return nil, err
}
sess, err := newSession(cfg)
if err != nil {
return nil, err
}
logsClientsByRegion := make(map[string](*cloudwatchlogs.CloudWatchLogs))
logsClientsByRegion[dsInfo.Region] = defaultLogsClient
logsClientsByRegion["default"] = defaultLogsClient
client := cloudwatch.New(sess, cfg)
client.Handlers.Send.PushFront(func(r *request.Request) {
r.HTTPRequest.Header.Set("User-Agent", fmt.Sprintf("Grafana/%s", setting.BuildVersion))
})
return &CloudWatchExecutor{
logsClientsByRegion: logsClientsByRegion,
}, nil
return client, nil
}
var plog = log.New("tsdb.cloudwatch")
var aliasFormat = regexp.MustCompile(`\{\{\s*(.+?)\s*\}\}`)
func (e *cloudWatchExecutor) getCWLogsClient(region string) (*cloudwatchlogs.CloudWatchLogs, error) {
e.mux.Lock()
defer e.mux.Unlock()
func init() {
tsdb.RegisterTsdbQueryEndpoint("cloudwatch", NewCloudWatchExecutor)
if logsClient, ok := e.logsClientsByRegion[region]; ok {
return logsClient, nil
}
dsInfo := e.getDSInfo(region)
newLogsClient, err := retrieveLogsClient(dsInfo)
if err != nil {
return nil, err
}
e.logsClientsByRegion[region] = newLogsClient
return newLogsClient, nil
}
func (e *CloudWatchExecutor) alertQuery(ctx context.Context, logsClient *cloudwatchlogs.CloudWatchLogs, queryContext *tsdb.TsdbQuery) (*cloudwatchlogs.GetQueryResultsOutput, error) {
func (e *cloudWatchExecutor) alertQuery(ctx context.Context, logsClient *cloudwatchlogs.CloudWatchLogs, queryContext *tsdb.TsdbQuery) (*cloudwatchlogs.GetQueryResultsOutput, error) {
const maxAttempts = 8
const pollPeriod = 1000 * time.Millisecond
......@@ -126,7 +152,8 @@ func (e *CloudWatchExecutor) alertQuery(ctx context.Context, logsClient *cloudwa
return nil, nil
}
func (e *CloudWatchExecutor) Query(ctx context.Context, dsInfo *models.DataSource, queryContext *tsdb.TsdbQuery) (*tsdb.Response, error) {
// Query executes a CloudWatch query.
func (e *cloudWatchExecutor) Query(ctx context.Context, dsInfo *models.DataSource, queryContext *tsdb.TsdbQuery) (*tsdb.Response, error) {
e.DataSource = dsInfo
/*
......@@ -163,18 +190,18 @@ func (e *CloudWatchExecutor) Query(ctx context.Context, dsInfo *models.DataSourc
return result, err
}
func (e *CloudWatchExecutor) executeLogAlertQuery(ctx context.Context, queryContext *tsdb.TsdbQuery) (*tsdb.Response, error) {
func (e *cloudWatchExecutor) executeLogAlertQuery(ctx context.Context, queryContext *tsdb.TsdbQuery) (*tsdb.Response, error) {
queryParams := queryContext.Queries[0].Model
queryParams.Set("subtype", "StartQuery")
queryParams.Set("queryString", queryParams.Get("expression").MustString(""))
region := queryParams.Get("region").MustString("default")
if region == "default" {
region := queryParams.Get("region").MustString(defaultRegion)
if region == defaultRegion {
region = e.DataSource.JsonData.Get("defaultRegion").MustString()
queryParams.Set("region", region)
}
logsClient, err := e.getLogsClient(region)
logsClient, err := e.getCWLogsClient(region)
if err != nil {
return nil, err
}
......@@ -227,6 +254,49 @@ func (e *CloudWatchExecutor) executeLogAlertQuery(ctx context.Context, queryCont
return response, nil
}
func (e *cloudWatchExecutor) getDSInfo(region string) *datasourceInfo {
if region == defaultRegion {
region = e.DataSource.JsonData.Get("defaultRegion").MustString()
}
authType := e.DataSource.JsonData.Get("authType").MustString()
assumeRoleArn := e.DataSource.JsonData.Get("assumeRoleArn").MustString()
externalID := e.DataSource.JsonData.Get("externalId").MustString()
decrypted := e.DataSource.DecryptedValues()
accessKey := decrypted["accessKey"]
secretKey := decrypted["secretKey"]
return &datasourceInfo{
Region: region,
Profile: e.DataSource.Database,
AuthType: authType,
AssumeRoleArn: assumeRoleArn,
ExternalID: externalID,
AccessKey: accessKey,
SecretKey: secretKey,
}
}
func retrieveLogsClient(dsInfo *datasourceInfo) (*cloudwatchlogs.CloudWatchLogs, error) {
cfg, err := getAwsConfig(dsInfo)
if err != nil {
return nil, err
}
sess, err := newSession(cfg)
if err != nil {
return nil, err
}
client := cloudwatchlogs.New(sess, cfg)
client.Handlers.Send.PushFront(func(r *request.Request) {
r.HTTPRequest.Header.Set("User-Agent", fmt.Sprintf("Grafana/%s", setting.BuildVersion))
})
return client, nil
}
func isTerminated(queryStatus string) bool {
return queryStatus == "Complete" || queryStatus == "Cancelled" || queryStatus == "Failed" || queryStatus == "Timeout"
}
......@@ -14,23 +14,18 @@ import (
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
"github.com/aws/aws-sdk-go/aws/defaults"
"github.com/aws/aws-sdk-go/aws/ec2metadata"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go/service/sts"
"github.com/aws/aws-sdk-go/service/sts/stsiface"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/setting"
)
type cache struct {
credential *credentials.Credentials
type envelope struct {
credentials *credentials.Credentials
expiration *time.Time
}
var awsCredentialCache = make(map[string]cache)
var credentialCacheLock sync.RWMutex
var awsCredsCache = map[string]envelope{}
var credsCacheLock sync.RWMutex
// Session factory.
// Stubbable by tests.
......@@ -50,18 +45,17 @@ var newEC2Metadata = func(p client.ConfigProvider, cfgs ...*aws.Config) *ec2meta
return ec2metadata.New(p, cfgs...)
}
func getCredentials(dsInfo *DatasourceInfo) (*credentials.Credentials, error) {
func getCredentials(dsInfo *datasourceInfo) (*credentials.Credentials, error) {
cacheKey := fmt.Sprintf("%s:%s:%s:%s", dsInfo.AuthType, dsInfo.AccessKey, dsInfo.Profile, dsInfo.AssumeRoleArn)
credentialCacheLock.RLock()
if _, ok := awsCredentialCache[cacheKey]; ok {
if awsCredentialCache[cacheKey].expiration != nil &&
awsCredentialCache[cacheKey].expiration.After(time.Now().UTC()) {
result := awsCredentialCache[cacheKey].credential
credentialCacheLock.RUnlock()
credsCacheLock.RLock()
if env, ok := awsCredsCache[cacheKey]; ok {
if env.expiration != nil && env.expiration.After(time.Now().UTC()) {
result := env.credentials
credsCacheLock.RUnlock()
return result, nil
}
}
credentialCacheLock.RUnlock()
credsCacheLock.RUnlock()
accessKeyID := ""
secretAccessKey := ""
......@@ -135,12 +129,12 @@ func getCredentials(dsInfo *DatasourceInfo) (*credentials.Credentials, error) {
remoteCredProvider(sess),
})
credentialCacheLock.Lock()
awsCredentialCache[cacheKey] = cache{
credential: creds,
credsCacheLock.Lock()
awsCredsCache[cacheKey] = envelope{
credentials: creds,
expiration: expiration,
}
credentialCacheLock.Unlock()
credsCacheLock.Unlock()
return creds, nil
}
......@@ -178,37 +172,7 @@ func ec2RoleProvider(sess *session.Session) credentials.Provider {
return &ec2rolecreds.EC2RoleProvider{Client: newEC2Metadata(sess), ExpiryWindow: 5 * time.Minute}
}
func (e *CloudWatchExecutor) getDsInfo(region string) *DatasourceInfo {
return retrieveDsInfo(e.DataSource, region)
}
func retrieveDsInfo(datasource *models.DataSource, region string) *DatasourceInfo {
defaultRegion := datasource.JsonData.Get("defaultRegion").MustString()
if region == "default" {
region = defaultRegion
}
authType := datasource.JsonData.Get("authType").MustString()
assumeRoleArn := datasource.JsonData.Get("assumeRoleArn").MustString()
externalID := datasource.JsonData.Get("externalId").MustString()
decrypted := datasource.DecryptedValues()
accessKey := decrypted["accessKey"]
secretKey := decrypted["secretKey"]
datasourceInfo := &DatasourceInfo{
Region: region,
Profile: datasource.Database,
AuthType: authType,
AssumeRoleArn: assumeRoleArn,
ExternalID: externalID,
AccessKey: accessKey,
SecretKey: secretKey,
}
return datasourceInfo
}
func getAwsConfig(dsInfo *DatasourceInfo) (*aws.Config, error) {
func getAwsConfig(dsInfo *datasourceInfo) (*aws.Config, error) {
creds, err := getCredentials(dsInfo)
if err != nil {
return nil, err
......@@ -221,44 +185,3 @@ func getAwsConfig(dsInfo *DatasourceInfo) (*aws.Config, error) {
return cfg, nil
}
func (e *CloudWatchExecutor) getClient(region string) (*cloudwatch.CloudWatch, error) {
datasourceInfo := e.getDsInfo(region)
cfg, err := getAwsConfig(datasourceInfo)
if err != nil {
return nil, err
}
sess, err := newSession(cfg)
if err != nil {
return nil, err
}
client := cloudwatch.New(sess, cfg)
client.Handlers.Send.PushFront(func(r *request.Request) {
r.HTTPRequest.Header.Set("User-Agent", fmt.Sprintf("Grafana/%s", setting.BuildVersion))
})
return client, nil
}
func retrieveLogsClient(datasourceInfo *DatasourceInfo) (*cloudwatchlogs.CloudWatchLogs, error) {
cfg, err := getAwsConfig(datasourceInfo)
if err != nil {
return nil, err
}
sess, err := newSession(cfg)
if err != nil {
return nil, err
}
client := cloudwatchlogs.New(sess, cfg)
client.Handlers.Send.PushFront(func(r *request.Request) {
r.HTTPRequest.Header.Set("User-Agent", fmt.Sprintf("Grafana/%s", setting.BuildVersion))
})
return client, nil
}
......@@ -87,7 +87,7 @@ func TestGetCredentials_ARNAuthType(t *testing.T) {
}, nil).
Times(1)
creds, err := getCredentials(&DatasourceInfo{
creds, err := getCredentials(&datasourceInfo{
AuthType: "arn",
})
require.NoError(t, err)
......@@ -113,7 +113,7 @@ func TestGetCredentials_ARNAuthType(t *testing.T) {
}, nil).
Times(1)
creds, err := getCredentials(&DatasourceInfo{
creds, err := getCredentials(&datasourceInfo{
AuthType: "arn",
ExternalID: "external-id",
})
......
......@@ -8,7 +8,7 @@ import (
"github.com/grafana/grafana/pkg/infra/metrics"
)
func (e *CloudWatchExecutor) executeRequest(ctx context.Context, client cloudWatchClient, metricDataInput *cloudwatch.GetMetricDataInput) ([]*cloudwatch.GetMetricDataOutput, error) {
func (e *cloudWatchExecutor) executeRequest(ctx context.Context, client cloudWatchClient, metricDataInput *cloudwatch.GetMetricDataInput) ([]*cloudwatch.GetMetricDataOutput, error) {
mdo := make([]*cloudwatch.GetMetricDataOutput, 0)
nextToken := ""
......
......@@ -35,7 +35,7 @@ func (client *cloudWatchFakeClient) GetMetricDataWithContext(ctx aws.Context, in
}
func TestGetMetricDataExecutorTest(t *testing.T) {
executor := &CloudWatchExecutor{}
executor := &cloudWatchExecutor{}
inputs := &cloudwatch.GetMetricDataInput{MetricDataQueries: []*cloudwatch.MetricDataQuery{}}
res, err := executor.executeRequest(context.Background(), &cloudWatchFakeClient{}, inputs)
require.NoError(t, err)
......
......@@ -15,7 +15,7 @@ import (
"golang.org/x/sync/errgroup"
)
func (e *CloudWatchExecutor) executeLogActions(ctx context.Context, queryContext *tsdb.TsdbQuery) (*tsdb.Response, error) {
func (e *cloudWatchExecutor) executeLogActions(ctx context.Context, queryContext *tsdb.TsdbQuery) (*tsdb.Response, error) {
resultChan := make(chan *tsdb.QueryResult, len(queryContext.Queries))
eg, ectx := errgroup.WithContext(ctx)
......@@ -73,13 +73,13 @@ func (e *CloudWatchExecutor) executeLogActions(ctx context.Context, queryContext
return response, nil
}
func (e *CloudWatchExecutor) executeLogAction(ctx context.Context, queryContext *tsdb.TsdbQuery, query *tsdb.Query) (*data.Frame, error) {
func (e *cloudWatchExecutor) executeLogAction(ctx context.Context, queryContext *tsdb.TsdbQuery, query *tsdb.Query) (*data.Frame, error) {
parameters := query.Model
subType := query.Model.Get("subtype").MustString()
defaultRegion := e.DataSource.JsonData.Get("defaultRegion").MustString()
region := parameters.Get("region").MustString(defaultRegion)
logsClient, err := e.getLogsClient(region)
logsClient, err := e.getCWLogsClient(region)
if err != nil {
return nil, err
}
......@@ -100,7 +100,6 @@ func (e *CloudWatchExecutor) executeLogAction(ctx context.Context, queryContext
case "GetLogEvents":
data, err = e.handleGetLogEvents(ctx, logsClient, parameters)
}
if err != nil {
return nil, err
}
......@@ -108,7 +107,8 @@ func (e *CloudWatchExecutor) executeLogAction(ctx context.Context, queryContext
return data, nil
}
func (e *CloudWatchExecutor) handleGetLogEvents(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, parameters *simplejson.Json) (*data.Frame, error) {
func (e *cloudWatchExecutor) handleGetLogEvents(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI,
parameters *simplejson.Json) (*data.Frame, error) {
queryRequest := &cloudwatchlogs.GetLogEventsInput{
Limit: aws.Int64(parameters.Get("limit").MustInt64(10)),
StartFromHead: aws.Bool(parameters.Get("startFromHead").MustBool(false)),
......@@ -159,7 +159,8 @@ func (e *CloudWatchExecutor) handleGetLogEvents(ctx context.Context, logsClient
return data.NewFrame("logEvents", timestampField, messageField), nil
}
func (e *CloudWatchExecutor) handleDescribeLogGroups(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, parameters *simplejson.Json) (*data.Frame, error) {
func (e *cloudWatchExecutor) handleDescribeLogGroups(ctx context.Context,
logsClient cloudwatchlogsiface.CloudWatchLogsAPI, parameters *simplejson.Json) (*data.Frame, error) {
logGroupNamePrefix := parameters.Get("logGroupNamePrefix").MustString("")
var response *cloudwatchlogs.DescribeLogGroupsOutput = nil
......@@ -189,7 +190,8 @@ func (e *CloudWatchExecutor) handleDescribeLogGroups(ctx context.Context, logsCl
return frame, nil
}
func (e *CloudWatchExecutor) executeStartQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, parameters *simplejson.Json, timeRange *tsdb.TimeRange) (*cloudwatchlogs.StartQueryOutput, error) {
func (e *cloudWatchExecutor) executeStartQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI,
parameters *simplejson.Json, timeRange *tsdb.TimeRange) (*cloudwatchlogs.StartQueryOutput, error) {
startTime, err := timeRange.ParseFrom()
if err != nil {
return nil, err
......@@ -224,7 +226,8 @@ func (e *CloudWatchExecutor) executeStartQuery(ctx context.Context, logsClient c
return logsClient.StartQueryWithContext(ctx, startQueryInput)
}
func (e *CloudWatchExecutor) handleStartQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, parameters *simplejson.Json, timeRange *tsdb.TimeRange, refID string) (*data.Frame, error) {
func (e *cloudWatchExecutor) handleStartQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI,
parameters *simplejson.Json, timeRange *tsdb.TimeRange, refID string) (*data.Frame, error) {
startQueryResponse, err := e.executeStartQuery(ctx, logsClient, parameters, timeRange)
if err != nil {
return nil, err
......@@ -244,7 +247,8 @@ func (e *CloudWatchExecutor) handleStartQuery(ctx context.Context, logsClient cl
return dataFrame, nil
}
func (e *CloudWatchExecutor) executeStopQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, parameters *simplejson.Json) (*cloudwatchlogs.StopQueryOutput, error) {
func (e *cloudWatchExecutor) executeStopQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI,
parameters *simplejson.Json) (*cloudwatchlogs.StopQueryOutput, error) {
queryInput := &cloudwatchlogs.StopQueryInput{
QueryId: aws.String(parameters.Get("queryId").MustString()),
}
......@@ -264,7 +268,8 @@ func (e *CloudWatchExecutor) executeStopQuery(ctx context.Context, logsClient cl
return response, err
}
func (e *CloudWatchExecutor) handleStopQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, parameters *simplejson.Json) (*data.Frame, error) {
func (e *cloudWatchExecutor) handleStopQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI,
parameters *simplejson.Json) (*data.Frame, error) {
response, err := e.executeStopQuery(ctx, logsClient, parameters)
if err != nil {
return nil, err
......@@ -274,7 +279,8 @@ func (e *CloudWatchExecutor) handleStopQuery(ctx context.Context, logsClient clo
return dataFrame, nil
}
func (e *CloudWatchExecutor) executeGetQueryResults(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, parameters *simplejson.Json) (*cloudwatchlogs.GetQueryResultsOutput, error) {
func (e *cloudWatchExecutor) executeGetQueryResults(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI,
parameters *simplejson.Json) (*cloudwatchlogs.GetQueryResultsOutput, error) {
queryInput := &cloudwatchlogs.GetQueryResultsInput{
QueryId: aws.String(parameters.Get("queryId").MustString()),
}
......@@ -282,7 +288,8 @@ func (e *CloudWatchExecutor) executeGetQueryResults(ctx context.Context, logsCli
return logsClient.GetQueryResultsWithContext(ctx, queryInput)
}
func (e *CloudWatchExecutor) handleGetQueryResults(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, parameters *simplejson.Json, refID string) (*data.Frame, error) {
func (e *cloudWatchExecutor) handleGetQueryResults(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI,
parameters *simplejson.Json, refID string) (*data.Frame, error) {
getQueryResultsOutput, err := e.executeGetQueryResults(ctx, logsClient, parameters)
if err != nil {
return nil, err
......@@ -299,7 +306,8 @@ func (e *CloudWatchExecutor) handleGetQueryResults(ctx context.Context, logsClie
return dataFrame, nil
}
func (e *CloudWatchExecutor) handleGetLogGroupFields(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, parameters *simplejson.Json, refID string) (*data.Frame, error) {
func (e *cloudWatchExecutor) handleGetLogGroupFields(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI,
parameters *simplejson.Json, refID string) (*data.Frame, error) {
queryInput := &cloudwatchlogs.GetLogGroupFieldsInput{
LogGroupName: aws.String(parameters.Get("logGroupName").MustString()),
Time: aws.Int64(parameters.Get("time").MustInt64()),
......
......@@ -21,7 +21,7 @@ import (
//***
func TestHandleDescribeLogGroups_WhenLogGroupNamePrefixIsEmpty(t *testing.T) {
executor := &CloudWatchExecutor{}
executor := &cloudWatchExecutor{}
logsClient := &FakeLogsClient{
Config: aws.Config{
......@@ -43,7 +43,7 @@ func TestHandleDescribeLogGroups_WhenLogGroupNamePrefixIsEmpty(t *testing.T) {
}
func TestHandleDescribeLogGroups_WhenLogGroupNamePrefixIsNotEmpty(t *testing.T) {
executor := &CloudWatchExecutor{}
executor := &cloudWatchExecutor{}
logsClient := &FakeLogsClient{
Config: aws.Config{
......@@ -64,7 +64,7 @@ func TestHandleDescribeLogGroups_WhenLogGroupNamePrefixIsNotEmpty(t *testing.T)
}
func TestHandleGetLogGroupFields_WhenLogGroupNamePrefixIsNotEmpty(t *testing.T) {
executor := &CloudWatchExecutor{}
executor := &cloudWatchExecutor{}
logsClient := &FakeLogsClient{
Config: aws.Config{
......@@ -89,7 +89,7 @@ func TestHandleGetLogGroupFields_WhenLogGroupNamePrefixIsNotEmpty(t *testing.T)
}
func TestExecuteStartQuery(t *testing.T) {
executor := &CloudWatchExecutor{}
executor := &cloudWatchExecutor{}
logsClient := &FakeLogsClient{
Config: aws.Config{
......@@ -117,7 +117,7 @@ func TestExecuteStartQuery(t *testing.T) {
}
func TestHandleStartQuery(t *testing.T) {
executor := &CloudWatchExecutor{}
executor := &cloudWatchExecutor{}
logsClient := &FakeLogsClient{
Config: aws.Config{
......@@ -152,7 +152,7 @@ func TestHandleStartQuery(t *testing.T) {
}
func TestHandleStopQuery(t *testing.T) {
executor := &CloudWatchExecutor{}
executor := &cloudWatchExecutor{}
logsClient := &FakeLogsClient{
Config: aws.Config{
......@@ -174,7 +174,7 @@ func TestHandleStopQuery(t *testing.T) {
}
func TestHandleGetQueryResults(t *testing.T) {
executor := &CloudWatchExecutor{}
executor := &cloudWatchExecutor{}
logsClient := &FakeLogsClient{
Config: aws.Config{
......
......@@ -7,7 +7,8 @@ import (
"github.com/aws/aws-sdk-go/service/cloudwatch"
)
func (e *CloudWatchExecutor) buildMetricDataInput(startTime time.Time, endTime time.Time, queries map[string]*cloudWatchQuery) (*cloudwatch.GetMetricDataInput, error) {
func (e *cloudWatchExecutor) buildMetricDataInput(startTime time.Time, endTime time.Time,
queries map[string]*cloudWatchQuery) (*cloudwatch.GetMetricDataInput, error) {
metricDataInput := &cloudwatch.GetMetricDataInput{
StartTime: aws.Time(startTime),
EndTime: aws.Time(endTime),
......
......@@ -10,7 +10,7 @@ import (
"github.com/aws/aws-sdk-go/service/cloudwatch"
)
func (e *CloudWatchExecutor) buildMetricDataQuery(query *cloudWatchQuery) (*cloudwatch.MetricDataQuery, error) {
func (e *cloudWatchExecutor) buildMetricDataQuery(query *cloudWatchQuery) (*cloudwatch.MetricDataQuery, error) {
mdq := &cloudwatch.MetricDataQuery{
Id: aws.String(query.Id),
ReturnData: aws.Bool(query.ReturnData),
......
......@@ -44,13 +44,16 @@ func (m mockedRGTA) GetResourcesPages(in *resourcegroupstaggingapi.GetResourcesI
func TestCloudWatchMetrics(t *testing.T) {
t.Run("When calling getMetricsForCustomMetrics", func(t *testing.T) {
dsInfo := &DatasourceInfo{
Region: "us-east-1",
Namespace: "Foo",
Profile: "default",
AssumeRoleArn: "",
const region = "us-east-1"
e := &cloudWatchExecutor{
DataSource: &models.DataSource{
Database: "default",
JsonData: simplejson.NewFromAny(map[string]interface{}{
"Region": region,
}),
},
}
f := func(dsInfo *DatasourceInfo) (cloudwatch.ListMetricsOutput, error) {
f := func(region string) (cloudwatch.ListMetricsOutput, error) {
return cloudwatch.ListMetricsOutput{
Metrics: []*cloudwatch.Metric{
{
......@@ -64,20 +67,23 @@ func TestCloudWatchMetrics(t *testing.T) {
},
}, nil
}
metrics, err := getMetricsForCustomMetrics(dsInfo, f)
metrics, err := e.getMetricsForCustomMetrics(region, f)
require.NoError(t, err)
assert.Contains(t, metrics, "Test_MetricName")
})
t.Run("When calling getDimensionsForCustomMetrics", func(t *testing.T) {
dsInfo := &DatasourceInfo{
Region: "us-east-1",
Namespace: "Foo",
Profile: "default",
AssumeRoleArn: "",
const region = "us-east-1"
e := &cloudWatchExecutor{
DataSource: &models.DataSource{
Database: "default",
JsonData: simplejson.NewFromAny(map[string]interface{}{
"Region": region,
}),
},
}
f := func(dsInfo *DatasourceInfo) (cloudwatch.ListMetricsOutput, error) {
f := func(region string) (cloudwatch.ListMetricsOutput, error) {
return cloudwatch.ListMetricsOutput{
Metrics: []*cloudwatch.Metric{
{
......@@ -91,14 +97,14 @@ func TestCloudWatchMetrics(t *testing.T) {
},
}, nil
}
dimensionKeys, err := getDimensionsForCustomMetrics(dsInfo, f)
dimensionKeys, err := e.getDimensionsForCustomMetrics(region, f)
require.NoError(t, err)
assert.Contains(t, dimensionKeys, "Test_DimensionName")
})
t.Run("When calling handleGetRegions", func(t *testing.T) {
executor := &CloudWatchExecutor{
executor := &cloudWatchExecutor{
ec2Svc: mockedEc2{RespRegions: ec2.DescribeRegionsOutput{
Regions: []*ec2.Region{
{
......@@ -123,7 +129,7 @@ func TestCloudWatchMetrics(t *testing.T) {
})
t.Run("When calling handleGetEc2InstanceAttribute", func(t *testing.T) {
executor := &CloudWatchExecutor{
executor := &cloudWatchExecutor{
ec2Svc: mockedEc2{Resp: ec2.DescribeInstancesOutput{
Reservations: []*ec2.Reservation{
{
......@@ -156,7 +162,7 @@ func TestCloudWatchMetrics(t *testing.T) {
})
t.Run("When calling handleGetEbsVolumeIds", func(t *testing.T) {
executor := &CloudWatchExecutor{
executor := &cloudWatchExecutor{
ec2Svc: mockedEc2{Resp: ec2.DescribeInstancesOutput{
Reservations: []*ec2.Reservation{
{
......@@ -217,7 +223,7 @@ func TestCloudWatchMetrics(t *testing.T) {
})
t.Run("When calling handleGetResourceArns", func(t *testing.T) {
executor := &CloudWatchExecutor{
executor := &cloudWatchExecutor{
rgtaSvc: mockedRGTA{
Resp: resourcegroupstaggingapi.GetResourcesOutput{
ResourceTagMappingList: []*resourcegroupstaggingapi.ResourceTagMapping{
......
......@@ -13,7 +13,7 @@ import (
// has more than one statistic defined, one cloudwatchQuery will be created for each statistic.
// If the query doesn't have an Id defined by the user, we'll give it an with format `query[RefId]`. In the case
// the incoming query had more than one stat, it will ge an id like `query[RefId]_[StatName]`, eg queryC_Average
func (e *CloudWatchExecutor) transformRequestQueriesToCloudWatchQueries(requestQueries []*requestQuery) (map[string]*cloudWatchQuery, error) {
func (e *cloudWatchExecutor) transformRequestQueriesToCloudWatchQueries(requestQueries []*requestQuery) (map[string]*cloudWatchQuery, error) {
cloudwatchQueries := make(map[string]*cloudWatchQuery)
for _, requestQuery := range requestQueries {
for _, stat := range requestQuery.Statistics {
......@@ -50,7 +50,7 @@ func (e *CloudWatchExecutor) transformRequestQueriesToCloudWatchQueries(requestQ
return cloudwatchQueries, nil
}
func (e *CloudWatchExecutor) transformQueryResponseToQueryResult(cloudwatchResponses []*cloudwatchResponse) map[string]*tsdb.QueryResult {
func (e *cloudWatchExecutor) transformQueryResponseToQueryResult(cloudwatchResponses []*cloudwatchResponse) map[string]*tsdb.QueryResult {
responsesByRefID := make(map[string][]*cloudwatchResponse)
for _, res := range cloudwatchResponses {
responsesByRefID[res.RefId] = append(responsesByRefID[res.RefId], res)
......
......@@ -10,7 +10,7 @@ import (
func TestQueryTransformer(t *testing.T) {
Convey("TestQueryTransformer", t, func() {
Convey("when transforming queries", func() {
executor := &CloudWatchExecutor{}
executor := &cloudWatchExecutor{}
Convey("one cloudwatchQuery is generated when its request query has one stat", func() {
requestQueries := []*requestQuery{
{
......
......@@ -15,7 +15,7 @@ import (
)
// Parses the json queries and returns a requestQuery. The requestQuery has a 1 to 1 mapping to a query editor row
func (e *CloudWatchExecutor) parseQueries(queryContext *tsdb.TsdbQuery, startTime time.Time, endTime time.Time) (map[string][]*requestQuery, error) {
func (e *cloudWatchExecutor) parseQueries(queryContext *tsdb.TsdbQuery, startTime time.Time, endTime time.Time) (map[string][]*requestQuery, error) {
requestQueries := make(map[string][]*requestQuery)
for i, model := range queryContext.Queries {
queryType := model.Model.Get("type").MustString()
......
......@@ -12,7 +12,7 @@ import (
"github.com/grafana/grafana/pkg/tsdb"
)
func (e *CloudWatchExecutor) parseResponse(metricDataOutputs []*cloudwatch.GetMetricDataOutput, queries map[string]*cloudWatchQuery) ([]*cloudwatchResponse, error) {
func (e *cloudWatchExecutor) parseResponse(metricDataOutputs []*cloudwatch.GetMetricDataOutput, queries map[string]*cloudWatchQuery) ([]*cloudwatchResponse, error) {
mdr := make(map[string]map[string]*cloudwatch.MetricDataResult)
for _, mdo := range metricDataOutputs {
requestExceededMaxLimit := false
......
......@@ -9,7 +9,7 @@ import (
"golang.org/x/sync/errgroup"
)
func (e *CloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, queryContext *tsdb.TsdbQuery) (*tsdb.Response, error) {
func (e *cloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, queryContext *tsdb.TsdbQuery) (*tsdb.Response, error) {
startTime, err := queryContext.TimeRange.ParseFrom()
if err != nil {
return nil, err
......@@ -50,7 +50,7 @@ func (e *CloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, queryCo
}
}()
client, err := e.getClient(region)
client, err := e.getCWClient(region)
if err != nil {
return err
}
......
......@@ -9,7 +9,7 @@ import (
)
func TestTimeSeriesQuery(t *testing.T) {
executor := &CloudWatchExecutor{}
executor := &cloudWatchExecutor{}
t.Run("End time before start time should result in error", func(t *testing.T) {
_, err := executor.executeTimeSeriesQuery(context.TODO(), &tsdb.TsdbQuery{TimeRange: tsdb.NewTimeRange("now-1h", "now-2h")})
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment