Commit 43ef052d by Patrik Karlström Committed by GitHub

cloudwatch: Consolidate client logic (#25555)

* cloudwatch: Consolidate client logic

Co-authored-by: Arve Knudsen <arve.knudsen@gmail.com>
parent 80edbbe3
...@@ -9,11 +9,17 @@ import ( ...@@ -9,11 +9,17 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/client"
"github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatch" "github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs" "github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs/cloudwatchlogsiface" "github.com/aws/aws-sdk-go/service/cloudwatchlogs/cloudwatchlogsiface"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface" "github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi"
"github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi/resourcegroupstaggingapiiface" "github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi/resourcegroupstaggingapiiface"
"github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/log"
...@@ -45,75 +51,95 @@ var plog = log.New("tsdb.cloudwatch") ...@@ -45,75 +51,95 @@ var plog = log.New("tsdb.cloudwatch")
var aliasFormat = regexp.MustCompile(`\{\{\s*(.+?)\s*\}\}`) var aliasFormat = regexp.MustCompile(`\{\{\s*(.+?)\s*\}\}`)
func init() { func init() {
tsdb.RegisterTsdbQueryEndpoint("cloudwatch", newcloudWatchExecutor) tsdb.RegisterTsdbQueryEndpoint("cloudwatch", func(ds *models.DataSource) (tsdb.TsdbQueryEndpoint, error) {
return newExecutor(), nil
})
} }
func newcloudWatchExecutor(datasource *models.DataSource) (tsdb.TsdbQueryEndpoint, error) { func newExecutor() *cloudWatchExecutor {
e := &cloudWatchExecutor{ return &cloudWatchExecutor{
DataSource: datasource, logsClientsByRegion: map[string]cloudwatchlogsiface.CloudWatchLogsAPI{},
} }
dsInfo := e.getDSInfo(defaultRegion)
defaultLogsClient, err := retrieveLogsClient(dsInfo)
if err != nil {
return nil, err
}
e.logsClientsByRegion = map[string]*cloudwatchlogs.CloudWatchLogs{
dsInfo.Region: defaultLogsClient,
defaultRegion: defaultLogsClient,
}
return e, nil
} }
// cloudWatchExecutor executes CloudWatch requests. // cloudWatchExecutor executes CloudWatch requests.
type cloudWatchExecutor struct { type cloudWatchExecutor struct {
*models.DataSource *models.DataSource
ec2Svc ec2iface.EC2API
rgtaSvc resourcegroupstaggingapiiface.ResourceGroupsTaggingAPIAPI
logsClientsByRegion map[string](*cloudwatchlogs.CloudWatchLogs) ec2Client ec2iface.EC2API
mux sync.Mutex rgtaClient resourcegroupstaggingapiiface.ResourceGroupsTaggingAPIAPI
logsClientsByRegion map[string]cloudwatchlogsiface.CloudWatchLogsAPI
mtx sync.Mutex
} }
func (e *cloudWatchExecutor) getCWClient(region string) (*cloudwatch.CloudWatch, error) { func (e *cloudWatchExecutor) newSession(region string) (*session.Session, error) {
datasourceInfo := e.getDSInfo(region) dsInfo := e.getDSInfo(region)
cfg, err := getAwsConfig(datasourceInfo) creds, err := getCredentials(dsInfo)
if err != nil { if err != nil {
return nil, err return nil, err
} }
sess, err := newSession(cfg) cfg := &aws.Config{
Region: aws.String(dsInfo.Region),
Credentials: creds,
}
return newSession(cfg)
}
func (e *cloudWatchExecutor) getCWClient(region string) (cloudwatchiface.CloudWatchAPI, error) {
sess, err := e.newSession(region)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return newCWClient(sess), nil
client := cloudwatch.New(sess, cfg)
client.Handlers.Send.PushFront(func(r *request.Request) {
r.HTTPRequest.Header.Set("User-Agent", fmt.Sprintf("Grafana/%s", setting.BuildVersion))
})
return client, nil
} }
func (e *cloudWatchExecutor) getCWLogsClient(region string) (*cloudwatchlogs.CloudWatchLogs, error) { func (e *cloudWatchExecutor) getCWLogsClient(region string) (cloudwatchlogsiface.CloudWatchLogsAPI, error) {
e.mux.Lock() e.mtx.Lock()
defer e.mux.Unlock() defer e.mtx.Unlock()
if logsClient, ok := e.logsClientsByRegion[region]; ok { if logsClient, ok := e.logsClientsByRegion[region]; ok {
return logsClient, nil return logsClient, nil
} }
dsInfo := e.getDSInfo(region) sess, err := e.newSession(region)
newLogsClient, err := retrieveLogsClient(dsInfo)
if err != nil { if err != nil {
return nil, err return nil, err
} }
e.logsClientsByRegion[region] = newLogsClient logsClient := newCWLogsClient(sess)
e.logsClientsByRegion[region] = logsClient
return logsClient, nil
}
func (e *cloudWatchExecutor) getEC2Client(region string) (ec2iface.EC2API, error) {
if e.ec2Client != nil {
return e.ec2Client, nil
}
sess, err := e.newSession(region)
if err != nil {
return nil, err
}
e.ec2Client = newEC2Client(sess)
return newLogsClient, nil return e.ec2Client, nil
}
func (e *cloudWatchExecutor) getRGTAClient(region string) (resourcegroupstaggingapiiface.ResourceGroupsTaggingAPIAPI,
error) {
if e.rgtaClient != nil {
return e.rgtaClient, nil
}
sess, err := e.newSession(region)
if err != nil {
return nil, err
}
e.rgtaClient = newRGTAClient(sess)
return e.rgtaClient, nil
} }
func (e *cloudWatchExecutor) alertQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, func (e *cloudWatchExecutor) alertQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI,
...@@ -279,26 +305,44 @@ func (e *cloudWatchExecutor) getDSInfo(region string) *datasourceInfo { ...@@ -279,26 +305,44 @@ func (e *cloudWatchExecutor) getDSInfo(region string) *datasourceInfo {
} }
} }
func retrieveLogsClient(dsInfo *datasourceInfo) (*cloudwatchlogs.CloudWatchLogs, error) { func isTerminated(queryStatus string) bool {
cfg, err := getAwsConfig(dsInfo) return queryStatus == "Complete" || queryStatus == "Cancelled" || queryStatus == "Failed" || queryStatus == "Timeout"
if err != nil { }
return nil, err
}
sess, err := newSession(cfg) // CloudWatch client factory.
if err != nil { //
return nil, err // Stubbable by tests.
} var newCWClient = func(sess *session.Session) cloudwatchiface.CloudWatchAPI {
client := cloudwatch.New(sess)
client.Handlers.Send.PushFront(func(r *request.Request) {
r.HTTPRequest.Header.Set("User-Agent", fmt.Sprintf("Grafana/%s", setting.BuildVersion))
})
client := cloudwatchlogs.New(sess, cfg) return client
}
// CloudWatch logs client factory.
//
// Stubbable by tests.
var newCWLogsClient = func(sess *session.Session) cloudwatchlogsiface.CloudWatchLogsAPI {
client := cloudwatchlogs.New(sess)
client.Handlers.Send.PushFront(func(r *request.Request) { client.Handlers.Send.PushFront(func(r *request.Request) {
r.HTTPRequest.Header.Set("User-Agent", fmt.Sprintf("Grafana/%s", setting.BuildVersion)) r.HTTPRequest.Header.Set("User-Agent", fmt.Sprintf("Grafana/%s", setting.BuildVersion))
}) })
return client, nil return client
} }
func isTerminated(queryStatus string) bool { // EC2 client factory.
return queryStatus == "Complete" || queryStatus == "Cancelled" || queryStatus == "Failed" || queryStatus == "Timeout" //
// Stubbable by tests.
var newEC2Client = func(provider client.ConfigProvider) ec2iface.EC2API {
return ec2.New(provider)
}
// RGTA client factory.
//
// Stubbable by tests.
var newRGTAClient = func(provider client.ConfigProvider) resourcegroupstaggingapiiface.ResourceGroupsTaggingAPIAPI {
return resourcegroupstaggingapi.New(provider)
} }
...@@ -174,17 +174,3 @@ func ecsCredProvider(sess *session.Session, uri string) credentials.Provider { ...@@ -174,17 +174,3 @@ func ecsCredProvider(sess *session.Session, uri string) credentials.Provider {
func ec2RoleProvider(sess client.ConfigProvider) credentials.Provider { func ec2RoleProvider(sess client.ConfigProvider) credentials.Provider {
return &ec2rolecreds.EC2RoleProvider{Client: newEC2Metadata(sess), ExpiryWindow: 5 * time.Minute} return &ec2rolecreds.EC2RoleProvider{Client: newEC2Metadata(sess), ExpiryWindow: 5 * time.Minute}
} }
func getAwsConfig(dsInfo *datasourceInfo) (*aws.Config, error) {
creds, err := getCredentials(dsInfo)
if err != nil {
return nil, err
}
cfg := &aws.Config{
Region: aws.String(dsInfo.Region),
Credentials: creds,
}
return cfg, nil
}
package cloudwatch
import (
"context"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs/cloudwatchlogsiface"
)
type FakeLogsClient struct {
cloudwatchlogsiface.CloudWatchLogsAPI
Config aws.Config
}
func (f FakeLogsClient) DescribeLogGroupsWithContext(ctx context.Context, input *cloudwatchlogs.DescribeLogGroupsInput, option ...request.Option) (*cloudwatchlogs.DescribeLogGroupsOutput, error) {
return &cloudwatchlogs.DescribeLogGroupsOutput{
LogGroups: []*cloudwatchlogs.LogGroup{
{
LogGroupName: aws.String("group_a"),
},
{
LogGroupName: aws.String("group_b"),
},
{
LogGroupName: aws.String("group_c"),
},
},
}, nil
}
func (f FakeLogsClient) GetLogGroupFieldsWithContext(ctx context.Context, input *cloudwatchlogs.GetLogGroupFieldsInput, option ...request.Option) (*cloudwatchlogs.GetLogGroupFieldsOutput, error) {
return &cloudwatchlogs.GetLogGroupFieldsOutput{
LogGroupFields: []*cloudwatchlogs.LogGroupField{
{
Name: aws.String("field_a"),
Percent: aws.Int64(100),
},
{
Name: aws.String("field_b"),
Percent: aws.Int64(30),
},
{
Name: aws.String("field_c"),
Percent: aws.Int64(55),
},
},
}, nil
}
func (f FakeLogsClient) StartQueryWithContext(ctx context.Context, input *cloudwatchlogs.StartQueryInput, option ...request.Option) (*cloudwatchlogs.StartQueryOutput, error) {
return &cloudwatchlogs.StartQueryOutput{
QueryId: aws.String("abcd-efgh-ijkl-mnop"),
}, nil
}
func (f FakeLogsClient) StopQueryWithContext(ctx context.Context, input *cloudwatchlogs.StopQueryInput, option ...request.Option) (*cloudwatchlogs.StopQueryOutput, error) {
return &cloudwatchlogs.StopQueryOutput{
Success: aws.Bool(true),
}, nil
}
func (f FakeLogsClient) GetQueryResultsWithContext(ctx context.Context, input *cloudwatchlogs.GetQueryResultsInput, option ...request.Option) (*cloudwatchlogs.GetQueryResultsOutput, error) {
return &cloudwatchlogs.GetQueryResultsOutput{
Results: [][]*cloudwatchlogs.ResultField{
{
{
Field: aws.String("@timestamp"),
Value: aws.String("2020-03-20 10:37:23.000"),
},
{
Field: aws.String("field_b"),
Value: aws.String("b_1"),
},
{
Field: aws.String("@ptr"),
Value: aws.String("abcdefg"),
},
},
{
{
Field: aws.String("@timestamp"),
Value: aws.String("2020-03-20 10:40:43.000"),
},
{
Field: aws.String("field_b"),
Value: aws.String("b_2"),
},
{
Field: aws.String("@ptr"),
Value: aws.String("hijklmnop"),
},
},
},
Statistics: &cloudwatchlogs.QueryStatistics{
BytesScanned: aws.Float64(512),
RecordsMatched: aws.Float64(256),
RecordsScanned: aws.Float64(1024),
},
Status: aws.String("Complete"),
}, nil
}
...@@ -12,7 +12,6 @@ import ( ...@@ -12,7 +12,6 @@ import (
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awsutil" "github.com/aws/aws-sdk-go/aws/awsutil"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatch" "github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/aws/aws-sdk-go/service/ec2" "github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi" "github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi"
...@@ -310,8 +309,7 @@ func parseMultiSelectValue(input string) []string { ...@@ -310,8 +309,7 @@ func parseMultiSelectValue(input string) []string {
// Whenever this list is updated, the frontend list should also be updated. // Whenever this list is updated, the frontend list should also be updated.
// Please update the region list in public/app/plugins/datasource/cloudwatch/partials/config.html // Please update the region list in public/app/plugins/datasource/cloudwatch/partials/config.html
func (e *cloudWatchExecutor) handleGetRegions(ctx context.Context, parameters *simplejson.Json, queryContext *tsdb.TsdbQuery) ([]suggestData, error) { func (e *cloudWatchExecutor) handleGetRegions(ctx context.Context, parameters *simplejson.Json, queryContext *tsdb.TsdbQuery) ([]suggestData, error) {
const region = "default" dsInfo := e.getDSInfo(defaultRegion)
dsInfo := e.getDSInfo(region)
profile := dsInfo.Profile profile := dsInfo.Profile
if cache, ok := regionCache.Load(profile); ok { if cache, ok := regionCache.Load(profile); ok {
if cache2, ok2 := cache.([]suggestData); ok2 { if cache2, ok2 := cache.([]suggestData); ok2 {
...@@ -319,12 +317,12 @@ func (e *cloudWatchExecutor) handleGetRegions(ctx context.Context, parameters *s ...@@ -319,12 +317,12 @@ func (e *cloudWatchExecutor) handleGetRegions(ctx context.Context, parameters *s
} }
} }
err := e.ensureClientSession("default") client, err := e.getEC2Client(defaultRegion)
if err != nil { if err != nil {
return nil, err return nil, err
} }
regions := knownRegions regions := knownRegions
r, err := e.ec2Svc.DescribeRegions(&ec2.DescribeRegionsInput{}) r, err := client.DescribeRegions(&ec2.DescribeRegionsInput{})
if err != nil { if err != nil {
// ignore error for backward compatibility // ignore error for backward compatibility
plog.Error("Failed to get regions", "error", err) plog.Error("Failed to get regions", "error", err)
...@@ -389,7 +387,7 @@ func (e *cloudWatchExecutor) handleGetMetrics(ctx context.Context, parameters *s ...@@ -389,7 +387,7 @@ func (e *cloudWatchExecutor) handleGetMetrics(ctx context.Context, parameters *s
dsInfo := e.getDSInfo(region) dsInfo := e.getDSInfo(region)
dsInfo.Namespace = namespace dsInfo.Namespace = namespace
if namespaceMetrics, err = e.getMetricsForCustomMetrics(region, e.getAllMetrics); err != nil { if namespaceMetrics, err = e.getMetricsForCustomMetrics(region); err != nil {
return nil, errors.New("Unable to call AWS API") return nil, errors.New("Unable to call AWS API")
} }
} }
...@@ -418,7 +416,7 @@ func (e *cloudWatchExecutor) handleGetDimensions(ctx context.Context, parameters ...@@ -418,7 +416,7 @@ func (e *cloudWatchExecutor) handleGetDimensions(ctx context.Context, parameters
dsInfo := e.getDSInfo(region) dsInfo := e.getDSInfo(region)
dsInfo.Namespace = namespace dsInfo.Namespace = namespace
if dimensionValues, err = e.getDimensionsForCustomMetrics(region, e.getAllMetrics); err != nil { if dimensionValues, err = e.getDimensionsForCustomMetrics(region); err != nil {
return nil, errors.New("Unable to call AWS API") return nil, errors.New("Unable to call AWS API")
} }
} }
...@@ -483,31 +481,10 @@ func (e *cloudWatchExecutor) handleGetDimensionValues(ctx context.Context, param ...@@ -483,31 +481,10 @@ func (e *cloudWatchExecutor) handleGetDimensionValues(ctx context.Context, param
return result, nil return result, nil
} }
func (e *cloudWatchExecutor) ensureClientSession(region string) error {
if e.ec2Svc == nil {
dsInfo := e.getDSInfo(region)
cfg, err := getAwsConfig(dsInfo)
if err != nil {
return fmt.Errorf("Failed to call ec2:getAwsConfig, %w", err)
}
sess, err := session.NewSession(cfg)
if err != nil {
return fmt.Errorf("Failed to call ec2:NewSession, %w", err)
}
e.ec2Svc = ec2.New(sess, cfg)
}
return nil
}
func (e *cloudWatchExecutor) handleGetEbsVolumeIds(ctx context.Context, parameters *simplejson.Json, queryContext *tsdb.TsdbQuery) ([]suggestData, error) { func (e *cloudWatchExecutor) handleGetEbsVolumeIds(ctx context.Context, parameters *simplejson.Json, queryContext *tsdb.TsdbQuery) ([]suggestData, error) {
region := parameters.Get("region").MustString() region := parameters.Get("region").MustString()
instanceId := parameters.Get("instanceId").MustString() instanceId := parameters.Get("instanceId").MustString()
err := e.ensureClientSession(region)
if err != nil {
return nil, err
}
instanceIds := aws.StringSlice(parseMultiSelectValue(instanceId)) instanceIds := aws.StringSlice(parseMultiSelectValue(instanceId))
instances, err := e.ec2DescribeInstances(region, nil, instanceIds) instances, err := e.ec2DescribeInstances(region, nil, instanceIds)
if err != nil { if err != nil {
...@@ -547,11 +524,6 @@ func (e *cloudWatchExecutor) handleGetEc2InstanceAttribute(ctx context.Context, ...@@ -547,11 +524,6 @@ func (e *cloudWatchExecutor) handleGetEc2InstanceAttribute(ctx context.Context,
} }
} }
err := e.ensureClientSession(region)
if err != nil {
return nil, err
}
instances, err := e.ec2DescribeInstances(region, filters, nil) instances, err := e.ec2DescribeInstances(region, filters, nil)
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -610,32 +582,11 @@ func (e *cloudWatchExecutor) handleGetEc2InstanceAttribute(ctx context.Context, ...@@ -610,32 +582,11 @@ func (e *cloudWatchExecutor) handleGetEc2InstanceAttribute(ctx context.Context,
return result, nil return result, nil
} }
func (e *cloudWatchExecutor) ensureRGTAClientSession(region string) error {
if e.rgtaSvc == nil {
dsInfo := e.getDSInfo(region)
cfg, err := getAwsConfig(dsInfo)
if err != nil {
return fmt.Errorf("Failed to call ec2:getAwsConfig, %w", err)
}
sess, err := session.NewSession(cfg)
if err != nil {
return fmt.Errorf("Failed to call ec2:NewSession, %w", err)
}
e.rgtaSvc = resourcegroupstaggingapi.New(sess, cfg)
}
return nil
}
func (e *cloudWatchExecutor) handleGetResourceArns(ctx context.Context, parameters *simplejson.Json, queryContext *tsdb.TsdbQuery) ([]suggestData, error) { func (e *cloudWatchExecutor) handleGetResourceArns(ctx context.Context, parameters *simplejson.Json, queryContext *tsdb.TsdbQuery) ([]suggestData, error) {
region := parameters.Get("region").MustString() region := parameters.Get("region").MustString()
resourceType := parameters.Get("resourceType").MustString() resourceType := parameters.Get("resourceType").MustString()
filterJson := parameters.Get("tags").MustMap() filterJson := parameters.Get("tags").MustMap()
err := e.ensureRGTAClientSession(region)
if err != nil {
return nil, err
}
var filters []*resourcegroupstaggingapi.TagFilter var filters []*resourcegroupstaggingapi.TagFilter
for k, v := range filterJson { for k, v := range filterJson {
if vv, ok := v.([]interface{}); ok { if vv, ok := v.([]interface{}); ok {
...@@ -706,8 +657,13 @@ func (e *cloudWatchExecutor) ec2DescribeInstances(region string, filters []*ec2. ...@@ -706,8 +657,13 @@ func (e *cloudWatchExecutor) ec2DescribeInstances(region string, filters []*ec2.
InstanceIds: instanceIds, InstanceIds: instanceIds,
} }
client, err := e.getEC2Client(region)
if err != nil {
return nil, err
}
var resp ec2.DescribeInstancesOutput var resp ec2.DescribeInstancesOutput
if err := e.ec2Svc.DescribeInstancesPages(params, func(page *ec2.DescribeInstancesOutput, lastPage bool) bool { if err := client.DescribeInstancesPages(params, func(page *ec2.DescribeInstancesOutput, lastPage bool) bool {
resp.Reservations = append(resp.Reservations, page.Reservations...) resp.Reservations = append(resp.Reservations, page.Reservations...)
return !lastPage return !lastPage
}); err != nil { }); err != nil {
...@@ -724,8 +680,13 @@ func (e *cloudWatchExecutor) resourceGroupsGetResources(region string, filters [ ...@@ -724,8 +680,13 @@ func (e *cloudWatchExecutor) resourceGroupsGetResources(region string, filters [
TagFilters: filters, TagFilters: filters,
} }
client, err := e.getRGTAClient(region)
if err != nil {
return nil, err
}
var resp resourcegroupstaggingapi.GetResourcesOutput var resp resourcegroupstaggingapi.GetResourcesOutput
if err := e.rgtaSvc.GetResourcesPages(params, if err := client.GetResourcesPages(params,
func(page *resourcegroupstaggingapi.GetResourcesOutput, lastPage bool) bool { func(page *resourcegroupstaggingapi.GetResourcesOutput, lastPage bool) bool {
resp.ResourceTagMappingList = append(resp.ResourceTagMappingList, page.ResourceTagMappingList...) resp.ResourceTagMappingList = append(resp.ResourceTagMappingList, page.ResourceTagMappingList...)
return !lastPage return !lastPage
...@@ -737,27 +698,18 @@ func (e *cloudWatchExecutor) resourceGroupsGetResources(region string, filters [ ...@@ -737,27 +698,18 @@ func (e *cloudWatchExecutor) resourceGroupsGetResources(region string, filters [
} }
func (e *cloudWatchExecutor) getAllMetrics(region string) (cloudwatch.ListMetricsOutput, error) { func (e *cloudWatchExecutor) getAllMetrics(region string) (cloudwatch.ListMetricsOutput, error) {
dsInfo := e.getDSInfo(region) client, err := e.getCWClient(region)
creds, err := getCredentials(dsInfo)
if err != nil { if err != nil {
return cloudwatch.ListMetricsOutput{}, err return cloudwatch.ListMetricsOutput{}, err
} }
cfg := &aws.Config{
Region: aws.String(dsInfo.Region),
Credentials: creds,
}
sess, err := session.NewSession(cfg)
if err != nil {
return cloudwatch.ListMetricsOutput{}, err
}
svc := cloudwatch.New(sess, cfg)
dsInfo := e.getDSInfo(region)
params := &cloudwatch.ListMetricsInput{ params := &cloudwatch.ListMetricsInput{
Namespace: aws.String(dsInfo.Namespace), Namespace: aws.String(dsInfo.Namespace),
} }
var resp cloudwatch.ListMetricsOutput var resp cloudwatch.ListMetricsOutput
err = svc.ListMetricsPages(params, func(page *cloudwatch.ListMetricsOutput, lastPage bool) bool { err = client.ListMetricsPages(params, func(page *cloudwatch.ListMetricsOutput, lastPage bool) bool {
metrics.MAwsCloudWatchListMetrics.Inc() metrics.MAwsCloudWatchListMetrics.Inc()
metrics, err := awsutil.ValuesAtPath(page, "Metrics") metrics, err := awsutil.ValuesAtPath(page, "Metrics")
if err != nil { if err != nil {
...@@ -769,12 +721,13 @@ func (e *cloudWatchExecutor) getAllMetrics(region string) (cloudwatch.ListMetric ...@@ -769,12 +721,13 @@ func (e *cloudWatchExecutor) getAllMetrics(region string) (cloudwatch.ListMetric
} }
return !lastPage return !lastPage
}) })
return resp, err return resp, err
} }
var metricsCacheLock sync.Mutex var metricsCacheLock sync.Mutex
func (e *cloudWatchExecutor) getMetricsForCustomMetrics(region string, getAllMetrics func(string) (cloudwatch.ListMetricsOutput, error)) ([]string, error) { func (e *cloudWatchExecutor) getMetricsForCustomMetrics(region string) ([]string, error) {
metricsCacheLock.Lock() metricsCacheLock.Lock()
defer metricsCacheLock.Unlock() defer metricsCacheLock.Unlock()
...@@ -794,7 +747,7 @@ func (e *cloudWatchExecutor) getMetricsForCustomMetrics(region string, getAllMet ...@@ -794,7 +747,7 @@ func (e *cloudWatchExecutor) getMetricsForCustomMetrics(region string, getAllMet
if customMetricsMetricsMap[dsInfo.Profile][dsInfo.Region][dsInfo.Namespace].Expire.After(time.Now()) { if customMetricsMetricsMap[dsInfo.Profile][dsInfo.Region][dsInfo.Namespace].Expire.After(time.Now()) {
return customMetricsMetricsMap[dsInfo.Profile][dsInfo.Region][dsInfo.Namespace].Cache, nil return customMetricsMetricsMap[dsInfo.Profile][dsInfo.Region][dsInfo.Namespace].Cache, nil
} }
result, err := getAllMetrics(region) result, err := e.getAllMetrics(region)
if err != nil { if err != nil {
return []string{}, err return []string{}, err
} }
...@@ -813,7 +766,7 @@ func (e *cloudWatchExecutor) getMetricsForCustomMetrics(region string, getAllMet ...@@ -813,7 +766,7 @@ func (e *cloudWatchExecutor) getMetricsForCustomMetrics(region string, getAllMet
var dimensionsCacheLock sync.Mutex var dimensionsCacheLock sync.Mutex
func (e *cloudWatchExecutor) getDimensionsForCustomMetrics(region string, getAllMetrics func(string) (cloudwatch.ListMetricsOutput, error)) ([]string, error) { func (e *cloudWatchExecutor) getDimensionsForCustomMetrics(region string) ([]string, error) {
dimensionsCacheLock.Lock() dimensionsCacheLock.Lock()
defer dimensionsCacheLock.Unlock() defer dimensionsCacheLock.Unlock()
...@@ -833,7 +786,7 @@ func (e *cloudWatchExecutor) getDimensionsForCustomMetrics(region string, getAll ...@@ -833,7 +786,7 @@ func (e *cloudWatchExecutor) getDimensionsForCustomMetrics(region string, getAll
if customMetricsDimensionsMap[dsInfo.Profile][dsInfo.Region][dsInfo.Namespace].Expire.After(time.Now()) { if customMetricsDimensionsMap[dsInfo.Profile][dsInfo.Region][dsInfo.Namespace].Expire.After(time.Now()) {
return customMetricsDimensionsMap[dsInfo.Profile][dsInfo.Region][dsInfo.Namespace].Cache, nil return customMetricsDimensionsMap[dsInfo.Profile][dsInfo.Region][dsInfo.Namespace].Cache, nil
} }
result, err := getAllMetrics(region) result, err := e.getAllMetrics(region)
if err != nil { if err != nil {
return []string{}, err return []string{}, err
} }
......
...@@ -10,7 +10,7 @@ import ( ...@@ -10,7 +10,7 @@ import (
func TestQueryTransformer(t *testing.T) { func TestQueryTransformer(t *testing.T) {
Convey("TestQueryTransformer", t, func() { Convey("TestQueryTransformer", t, func() {
Convey("when transforming queries", func() { Convey("when transforming queries", func() {
executor := &cloudWatchExecutor{} executor := newExecutor()
Convey("one cloudwatchQuery is generated when its request query has one stat", func() { Convey("one cloudwatchQuery is generated when its request query has one stat", func() {
requestQueries := []*requestQuery{ requestQueries := []*requestQuery{
{ {
......
package cloudwatch
import (
"context"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs/cloudwatchlogsiface"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi"
"github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi/resourcegroupstaggingapiiface"
"github.com/grafana/grafana/pkg/components/securejsondata"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/models"
)
func fakeDataSource() *models.DataSource {
jsonData := simplejson.New()
jsonData.Set("defaultRegion", "default")
return &models.DataSource{
Id: 1,
Database: "default",
JsonData: jsonData,
SecureJsonData: securejsondata.SecureJsonData{},
}
}
type fakeCWLogsClient struct {
cloudwatchlogsiface.CloudWatchLogsAPI
logGroups cloudwatchlogs.DescribeLogGroupsOutput
logGroupFields cloudwatchlogs.GetLogGroupFieldsOutput
queryResults cloudwatchlogs.GetQueryResultsOutput
}
func (m fakeCWLogsClient) GetQueryResultsWithContext(ctx context.Context, input *cloudwatchlogs.GetQueryResultsInput, option ...request.Option) (*cloudwatchlogs.GetQueryResultsOutput, error) {
return &m.queryResults, nil
}
func (m fakeCWLogsClient) StartQueryWithContext(ctx context.Context, input *cloudwatchlogs.StartQueryInput, option ...request.Option) (*cloudwatchlogs.StartQueryOutput, error) {
return &cloudwatchlogs.StartQueryOutput{
QueryId: aws.String("abcd-efgh-ijkl-mnop"),
}, nil
}
func (m fakeCWLogsClient) StopQueryWithContext(ctx context.Context, input *cloudwatchlogs.StopQueryInput, option ...request.Option) (*cloudwatchlogs.StopQueryOutput, error) {
return &cloudwatchlogs.StopQueryOutput{
Success: aws.Bool(true),
}, nil
}
func (m fakeCWLogsClient) DescribeLogGroupsWithContext(ctx context.Context, input *cloudwatchlogs.DescribeLogGroupsInput, option ...request.Option) (*cloudwatchlogs.DescribeLogGroupsOutput, error) {
return &m.logGroups, nil
}
func (m fakeCWLogsClient) GetLogGroupFieldsWithContext(ctx context.Context, input *cloudwatchlogs.GetLogGroupFieldsInput, option ...request.Option) (*cloudwatchlogs.GetLogGroupFieldsOutput, error) {
return &m.logGroupFields, nil
}
type fakeCWClient struct {
cloudwatchiface.CloudWatchAPI
metrics []*cloudwatch.Metric
}
func (c fakeCWClient) ListMetricsPages(input *cloudwatch.ListMetricsInput, fn func(*cloudwatch.ListMetricsOutput, bool) bool) error {
fn(&cloudwatch.ListMetricsOutput{
Metrics: c.metrics,
}, true)
return nil
}
type fakeEC2Client struct {
ec2iface.EC2API
regions []string
reservations []*ec2.Reservation
}
func (c fakeEC2Client) DescribeRegions(*ec2.DescribeRegionsInput) (*ec2.DescribeRegionsOutput, error) {
regions := []*ec2.Region{}
for _, region := range c.regions {
regions = append(regions, &ec2.Region{
RegionName: aws.String(region),
})
}
return &ec2.DescribeRegionsOutput{
Regions: regions,
}, nil
}
func (c fakeEC2Client) DescribeInstancesPages(in *ec2.DescribeInstancesInput,
fn func(*ec2.DescribeInstancesOutput, bool) bool) error {
reservations := []*ec2.Reservation{}
for _, r := range c.reservations {
instances := []*ec2.Instance{}
for _, inst := range r.Instances {
if len(in.InstanceIds) == 0 {
instances = append(instances, inst)
continue
}
for _, id := range in.InstanceIds {
if *inst.InstanceId == *id {
instances = append(instances, inst)
}
}
}
reservation := &ec2.Reservation{Instances: instances}
reservations = append(reservations, reservation)
}
fn(&ec2.DescribeInstancesOutput{
Reservations: reservations,
}, true)
return nil
}
type fakeRGTAClient struct {
resourcegroupstaggingapiiface.ResourceGroupsTaggingAPIAPI
tagMapping []*resourcegroupstaggingapi.ResourceTagMapping
}
func (c fakeRGTAClient) GetResourcesPages(in *resourcegroupstaggingapi.GetResourcesInput,
fn func(*resourcegroupstaggingapi.GetResourcesOutput, bool) bool) error {
fn(&resourcegroupstaggingapi.GetResourcesOutput{
ResourceTagMappingList: c.tagMapping,
}, true)
return nil
}
...@@ -9,7 +9,7 @@ import ( ...@@ -9,7 +9,7 @@ import (
) )
func TestTimeSeriesQuery(t *testing.T) { func TestTimeSeriesQuery(t *testing.T) {
executor := &cloudWatchExecutor{} executor := newExecutor()
t.Run("End time before start time should result in error", func(t *testing.T) { t.Run("End time before start time should result in error", func(t *testing.T) {
_, err := executor.executeTimeSeriesQuery(context.TODO(), &tsdb.TsdbQuery{TimeRange: tsdb.NewTimeRange("now-1h", "now-2h")}) _, err := executor.executeTimeSeriesQuery(context.TODO(), &tsdb.TsdbQuery{TimeRange: tsdb.NewTimeRange("now-1h", "now-2h")})
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment