Commit 65b0365a by ying-jeanne Committed by GitHub

Cloud Monitoring: Convert datasource to use Dataframes (#29830)

* Convert Cloud Monitoring (Stackdriver) Datasource to use Dataframes #29830

* add deeplink into config

* omggggggggggggggg this deeplink works!

* move unit to the backend part

* remove unit from frontend

* only set the config fields[1] for deeplink and unit

* refactory + fix some test

* remove frontend test for unit

* adding backend test for unit mapping

* resolve review

* rewrtie unit logic to do exactly the same as frontend filter

* refactory
parent 382c75d0
......@@ -15,8 +15,8 @@ import (
"strings"
"time"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/api/pluginproxy"
"github.com/grafana/grafana/pkg/components/null"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models"
......@@ -39,6 +39,21 @@ var (
metricNameFormat = regexp.MustCompile(`([\w\d_]+)\.(googleapis\.com|io)/(.+)`)
wildcardRegexRe = regexp.MustCompile(`[-\/^$+?.()|[\]{}]`)
alignmentPeriodRe = regexp.MustCompile("[0-9]+")
cloudMonitoringUnitMappings = map[string]string{
"bit": "bits",
"By": "bytes",
"s": "s",
"min": "m",
"h": "h",
"d": "d",
"us": "µs",
"ms": "ms",
"ns": "ns",
"percent": "percent",
"MiBy": "mbytes",
"By/s": "Bps",
"GBy": "decgbytes",
}
)
const (
......@@ -202,17 +217,13 @@ func (e *CloudMonitoringExecutor) executeTimeSeriesQuery(ctx context.Context, ts
return nil, err
}
unit := e.resolvePanelUnitFromQueries(queries)
for _, query := range queries {
queryRes, resp, err := e.executeQuery(ctx, query, tsdbQuery)
if err != nil {
return nil, err
}
err = e.parseResponse(queryRes, resp, query)
if err != nil {
queryRes.Error = err
}
result.Results[query.RefID] = queryRes
resourceType := ""
for _, s := range resp.TimeSeries {
......@@ -221,16 +232,48 @@ func (e *CloudMonitoringExecutor) executeTimeSeriesQuery(ctx context.Context, ts
break
}
query.Params.Set("resourceType", resourceType)
dl := ""
if len(resp.TimeSeries) > 0 {
dl = query.buildDeepLink()
err = e.parseResponse(queryRes, resp, query)
if err != nil {
queryRes.Error = err
}
if len(unit) > 0 {
frames, _ := queryRes.Dataframes.Decoded()
for i := range frames {
if frames[i].Fields[1].Config == nil {
frames[i].Fields[1].Config = &data.FieldConfig{}
}
frames[i].Fields[1].Config.Unit = unit
}
queryRes.Meta.Set("deepLink", dl)
queryRes.Dataframes = tsdb.NewDecodedDataFrames(frames)
}
result.Results[query.RefID] = queryRes
}
return result, nil
}
func (e *CloudMonitoringExecutor) resolvePanelUnitFromQueries(queries []*cloudMonitoringQuery) string {
if len(queries) == 0 {
return ""
}
unit := queries[0].Unit
if len(queries) > 1 {
for _, query := range queries[1:] {
if query.Unit != unit {
return ""
}
}
}
if len(unit) > 0 {
if val, ok := cloudMonitoringUnitMappings[unit]; ok {
return val
}
}
return ""
}
func (e *CloudMonitoringExecutor) buildQueries(tsdbQuery *tsdb.TsdbQuery) ([]*cloudMonitoringQuery, error) {
cloudMonitoringQueries := []*cloudMonitoringQuery{}
......@@ -286,7 +329,7 @@ func (e *CloudMonitoringExecutor) buildQueries(tsdbQuery *tsdb.TsdbQuery) ([]*cl
target = params.Encode()
sq.Target = target
sq.Params = params
sq.Unit = q.MetricQuery.Unit
if setting.Env == setting.Dev {
slog.Debug("CloudMonitoring request", "params", params)
}
......@@ -507,9 +550,8 @@ func (e *CloudMonitoringExecutor) unmarshalResponse(res *http.Response) (cloudMo
}
func handleDistributionSeries(series timeSeries, defaultMetricName string, seriesLabels map[string]string,
query *cloudMonitoringQuery, queryRes *tsdb.QueryResult) {
points := make([]tsdb.TimePoint, 0)
for i := len(series.Points) - 1; i >= 0; i-- {
query *cloudMonitoringQuery, queryRes *tsdb.QueryResult, frame *data.Frame) {
for i := 0; i < len(series.Points); i++ {
point := series.Points[i]
value := point.Value.DoubleValue
......@@ -527,27 +569,27 @@ func handleDistributionSeries(series timeSeries, defaultMetricName string, serie
value = 0
}
}
points = append(points, tsdb.NewTimePoint(null.FloatFrom(value), float64((point.Interval.EndTime).Unix())*1000))
frame.SetRow(len(series.Points)-1-i, point.Interval.EndTime, value)
}
metricName := formatLegendKeys(series.Metric.Type, defaultMetricName, seriesLabels, nil, query)
queryRes.Series = append(queryRes.Series, &tsdb.TimeSeries{
Name: metricName,
Points: points,
})
dataField := frame.Fields[1]
dataField.Name = metricName
}
func (e *CloudMonitoringExecutor) parseResponse(queryRes *tsdb.QueryResult, data cloudMonitoringResponse, query *cloudMonitoringQuery) error {
func (e *CloudMonitoringExecutor) parseResponse(queryRes *tsdb.QueryResult, cmr cloudMonitoringResponse, query *cloudMonitoringQuery) error {
labels := make(map[string]map[string]bool)
for _, series := range data.TimeSeries {
seriesLabels := make(map[string]string)
frames := data.Frames{}
for _, series := range cmr.TimeSeries {
seriesLabels := data.Labels{}
defaultMetricName := series.Metric.Type
labels["resource.type"] = map[string]bool{series.Resource.Type: true}
seriesLabels["resource.type"] = series.Resource.Type
frame := data.NewFrameOfFieldTypes("", len(series.Points), data.FieldTypeTime, data.FieldTypeFloat64)
frame.RefID = query.RefID
for key, value := range series.Metric.Labels {
if _, ok := labels["metric.label."+key]; !ok {
labels["metric.label."+key] = map[string]bool{}
......@@ -602,10 +644,11 @@ func (e *CloudMonitoringExecutor) parseResponse(queryRes *tsdb.QueryResult, data
// reverse the order to be ascending
if series.ValueType != "DISTRIBUTION" {
handleDistributionSeries(series, defaultMetricName, seriesLabels, query, queryRes)
handleDistributionSeries(
series, defaultMetricName, seriesLabels, query, queryRes, frame)
frames = append(frames, frame)
} else {
buckets := make(map[int]*tsdb.TimeSeries)
buckets := make(map[int]*data.Frame)
for i := len(series.Points) - 1; i >= 0; i-- {
point := series.Points[i]
if len(point.Value.DistributionValue.BucketCounts) == 0 {
......@@ -622,35 +665,57 @@ func (e *CloudMonitoringExecutor) parseResponse(queryRes *tsdb.QueryResult, data
// https://cloud.google.com/monitoring/api/ref_v3/rest/v3/TimeSeries#Distribution
bucketBound := calcBucketBound(point.Value.DistributionValue.BucketOptions, i)
additionalLabels := map[string]string{"bucket": bucketBound}
buckets[i] = &tsdb.TimeSeries{
Name: formatLegendKeys(series.Metric.Type, defaultMetricName, nil, additionalLabels, query),
Points: make([]tsdb.TimePoint, 0),
timeField := data.NewField(data.TimeSeriesTimeFieldName, nil, []time.Time{})
valueField := data.NewField(data.TimeSeriesValueFieldName, nil, []float64{})
frameName := formatLegendKeys(series.Metric.Type, defaultMetricName, nil, additionalLabels, query)
valueField.Name = frameName
buckets[i] = &data.Frame{
Name: frameName,
Fields: []*data.Field{
timeField,
valueField,
},
RefID: query.RefID,
}
if maxKey < i {
maxKey = i
}
}
buckets[i].Points = append(buckets[i].Points, tsdb.NewTimePoint(null.FloatFrom(value), float64((point.Interval.EndTime).Unix())*1000))
buckets[i].AppendRow(point.Interval.EndTime, value)
}
// fill empty bucket
for i := 0; i < maxKey; i++ {
if _, ok := buckets[i]; !ok {
bucketBound := calcBucketBound(point.Value.DistributionValue.BucketOptions, i)
additionalLabels := map[string]string{"bucket": bucketBound}
buckets[i] = &tsdb.TimeSeries{
Name: formatLegendKeys(series.Metric.Type, defaultMetricName, seriesLabels, additionalLabels, query),
Points: make([]tsdb.TimePoint, 0),
additionalLabels := data.Labels{"bucket": bucketBound}
timeField := data.NewField(data.TimeSeriesTimeFieldName, nil, []time.Time{})
valueField := data.NewField(data.TimeSeriesValueFieldName, nil, []float64{})
frameName := formatLegendKeys(series.Metric.Type, defaultMetricName, seriesLabels, additionalLabels, query)
valueField.Name = frameName
buckets[i] = &data.Frame{
Name: frameName,
Fields: []*data.Field{
timeField,
valueField,
},
RefID: query.RefID,
}
}
}
}
for i := 0; i < len(buckets); i++ {
queryRes.Series = append(queryRes.Series, buckets[i])
frames = append(frames, buckets[i])
}
}
}
if len(cmr.TimeSeries) > 0 {
frames = addConfigData(frames, query)
}
queryRes.Dataframes = tsdb.NewDecodedDataFrames(frames)
labelsByKey := make(map[string][]string)
for key, values := range labels {
for value := range values {
......@@ -660,10 +725,25 @@ func (e *CloudMonitoringExecutor) parseResponse(queryRes *tsdb.QueryResult, data
queryRes.Meta.Set("labels", labelsByKey)
queryRes.Meta.Set("groupBys", query.GroupBys)
return nil
}
func addConfigData(frames data.Frames, query *cloudMonitoringQuery) data.Frames {
dl := query.buildDeepLink()
for i := range frames {
if frames[i].Fields[1].Config == nil {
frames[i].Fields[1].Config = &data.FieldConfig{}
}
deepLink := data.DataLink{
Title: "View in Metrics Explorer",
TargetBlank: true,
URL: dl,
}
frames[i].Fields[1].Config.Links = append(frames[i].Fields[1].Config.Links, deepLink)
}
return frames
}
func toSnakeCase(str string) string {
return strings.ToLower(matchAllCap.ReplaceAllString(str, "${1}_${2}"))
}
......
......@@ -16,6 +16,7 @@ type (
Selector string
Service string
Slo string
Unit string
}
metricQuery struct {
......@@ -28,6 +29,7 @@ type (
Filters []string
AliasBy string
View string
Unit string
}
sloQuery struct {
......
......@@ -246,22 +246,6 @@ export const alignmentPeriods = [
{ text: '1w', value: '+604800s' },
];
export const cloudMonitoringUnitMappings = {
bit: 'bits',
By: 'bytes',
s: 's',
min: 'm',
h: 'h',
d: 'd',
us: 'µs',
ms: 'ms',
ns: 'ns',
percent: 'percent',
MiBy: 'mbytes',
'By/s': 'Bps',
GBy: 'decgbytes',
};
export const systemLabels = [
'metadata.system_labels.cloud_account',
'metadata.system_labels.name',
......
......@@ -2,26 +2,28 @@ import _ from 'lodash';
import {
DataQueryRequest,
DataQueryResponseData,
DataSourceApi,
DataSourceInstanceSettings,
ScopedVars,
SelectableValue,
toDataFrame,
DataQueryResponse,
} from '@grafana/data';
import { getTemplateSrv, TemplateSrv } from 'app/features/templating/template_srv';
import { getTimeSrv, TimeSrv } from 'app/features/dashboard/services/TimeSrv';
import { CloudMonitoringOptions, CloudMonitoringQuery, Filter, MetricDescriptor, QueryType } from './types';
import { cloudMonitoringUnitMappings } from './constants';
import API, { PostResponse } from './api';
import API from './api';
import { DataSourceWithBackend } from '@grafana/runtime';
import { CloudMonitoringVariableSupport } from './variables';
import { catchError, map, mergeMap } from 'rxjs/operators';
import { from, Observable, of, throwError } from 'rxjs';
export default class CloudMonitoringDatasource extends DataSourceApi<CloudMonitoringQuery, CloudMonitoringOptions> {
export default class CloudMonitoringDatasource extends DataSourceWithBackend<
CloudMonitoringQuery,
CloudMonitoringOptions
> {
api: API;
authenticationType: string;
intervalMs: number;
constructor(
private instanceSettings: DataSourceInstanceSettings<CloudMonitoringOptions>,
......@@ -31,7 +33,6 @@ export default class CloudMonitoringDatasource extends DataSourceApi<CloudMonito
super(instanceSettings);
this.authenticationType = instanceSettings.jsonData.authenticationType || 'jwt';
this.api = new API(`${instanceSettings.url!}/cloudmonitoring/v3/projects/`);
this.variables = new CloudMonitoringVariableSupport(this);
}
......@@ -39,52 +40,12 @@ export default class CloudMonitoringDatasource extends DataSourceApi<CloudMonito
return this.templateSrv.getVariables().map(v => `$${v.name}`);
}
query(options: DataQueryRequest<CloudMonitoringQuery>): Observable<DataQueryResponseData> {
return this.getTimeSeries(options).pipe(
map(data => {
if (!data.results) {
return { data: [] };
}
const result: DataQueryResponseData[] = [];
const values = Object.values(data.results);
for (const queryRes of values) {
if (!queryRes.series) {
continue;
}
const unit = this.resolvePanelUnitFromTargets(options.targets);
for (const series of queryRes.series) {
let timeSerie: any = {
target: series.name,
datapoints: series.points,
refId: queryRes.refId,
meta: queryRes.meta,
};
if (unit) {
timeSerie = { ...timeSerie, unit };
}
const df = toDataFrame(timeSerie);
for (const field of df.fields) {
if (queryRes.meta?.deepLink && queryRes.meta?.deepLink.length > 0) {
field.config.links = [
{
url: queryRes.meta?.deepLink,
title: 'View in Metrics Explorer',
targetBlank: true,
},
];
}
}
result.push(df);
}
}
return { data: result };
})
);
query(request: DataQueryRequest<CloudMonitoringQuery>): Observable<DataQueryResponse> {
request.targets = request.targets.map(t => ({
...this.migrateQuery(t),
intervalMs: request.intervalMs,
}));
return super.query(request);
}
async annotationQuery(options: any) {
......@@ -134,33 +95,32 @@ export default class CloudMonitoringDatasource extends DataSourceApi<CloudMonito
.toPromise();
}
getTimeSeries(options: DataQueryRequest<CloudMonitoringQuery>): Observable<PostResponse> {
const queries = options.targets
.map(this.migrateQuery)
.filter(this.shouldRunQuery)
.map(q => this.prepareTimeSeriesQuery(q, options.scopedVars))
.map(q => ({ ...q, intervalMs: options.intervalMs, type: 'timeSeriesQuery' }));
if (!queries.length) {
return of({ results: [] });
}
return from(this.ensureGCEDefaultProject()).pipe(
mergeMap(() => {
return this.api.post({
from: options.range.from.valueOf().toString(),
to: options.range.to.valueOf().toString(),
queries,
});
}),
map(({ data }) => {
return data;
})
);
applyTemplateVariables(
{ metricQuery, refId, queryType, sloQuery }: CloudMonitoringQuery,
scopedVars: ScopedVars
): Record<string, any> {
return {
datasourceId: this.id,
refId,
intervalMs: this.intervalMs,
type: 'timeSeriesQuery',
queryType,
metricQuery: {
...this.interpolateProps(metricQuery, scopedVars),
projectName: this.templateSrv.replace(
metricQuery.projectName ? metricQuery.projectName : this.getDefaultProject(),
scopedVars
),
filters: this.interpolateFilters(metricQuery.filters || [], scopedVars),
groupBys: this.interpolateGroupBys(metricQuery.groupBys || [], scopedVars),
view: metricQuery.view || 'FULL',
},
sloQuery: sloQuery && this.interpolateProps(sloQuery, scopedVars),
};
}
async getLabels(metricType: string, refId: string, projectName: string, groupBys?: string[]) {
return this.getTimeSeries({
const options = {
targets: [
{
refId,
......@@ -176,8 +136,26 @@ export default class CloudMonitoringDatasource extends DataSourceApi<CloudMonito
},
],
range: this.timeSrv.timeRange(),
} as DataQueryRequest<CloudMonitoringQuery>)
} as DataQueryRequest<CloudMonitoringQuery>;
const queries = options.targets;
if (!queries.length) {
return of({ results: [] }).toPromise();
}
return from(this.ensureGCEDefaultProject())
.pipe(
mergeMap(() => {
return this.api.post({
from: options.range.from.valueOf().toString(),
to: options.range.to.valueOf().toString(),
queries,
});
}),
map(({ data }) => {
return data;
}),
map(response => {
const result = response.results[refId];
return result && result.meta ? result.meta.labels : {};
......@@ -311,9 +289,11 @@ export default class CloudMonitoringDatasource extends DataSourceApi<CloudMonito
migrateQuery(query: CloudMonitoringQuery): CloudMonitoringQuery {
if (!query.hasOwnProperty('metricQuery')) {
const { hide, refId, datasource, key, queryType, maxLines, metric, ...rest } = query as any;
const { hide, refId, datasource, key, queryType, maxLines, metric, intervalMs, type, ...rest } = query as any;
return {
refId,
intervalMs,
type,
hide,
queryType: QueryType.METRICS,
metricQuery: {
......@@ -334,7 +314,7 @@ export default class CloudMonitoringDatasource extends DataSourceApi<CloudMonito
}, {} as T);
}
shouldRunQuery(query: CloudMonitoringQuery): boolean {
filterQuery(query: CloudMonitoringQuery): boolean {
if (query.hide) {
return false;
}
......@@ -349,30 +329,8 @@ export default class CloudMonitoringDatasource extends DataSourceApi<CloudMonito
return !!metricType;
}
prepareTimeSeriesQuery(
{ metricQuery, refId, queryType, sloQuery }: CloudMonitoringQuery,
scopedVars: ScopedVars
): CloudMonitoringQuery {
return {
datasourceId: this.id,
refId,
queryType,
metricQuery: {
...this.interpolateProps(metricQuery, scopedVars),
projectName: this.templateSrv.replace(
metricQuery.projectName ? metricQuery.projectName : this.getDefaultProject(),
scopedVars
),
filters: this.interpolateFilters(metricQuery.filters || [], scopedVars),
groupBys: this.interpolateGroupBys(metricQuery.groupBys || [], scopedVars),
view: metricQuery.view || 'FULL',
},
sloQuery: sloQuery && this.interpolateProps(sloQuery, scopedVars),
};
}
interpolateVariablesInQueries(queries: CloudMonitoringQuery[], scopedVars: ScopedVars): CloudMonitoringQuery[] {
return queries.map(query => this.prepareTimeSeriesQuery(query, scopedVars));
return queries.map(query => this.applyTemplateVariables(query, scopedVars) as CloudMonitoringQuery);
}
interpolateFilters(filters: string[], scopedVars: ScopedVars) {
......@@ -409,15 +367,4 @@ export default class CloudMonitoringDatasource extends DataSourceApi<CloudMonito
});
return interpolatedGroupBys;
}
resolvePanelUnitFromTargets(targets: any) {
let unit;
if (targets.length > 0 && targets.every((t: any) => t.unit === targets[0].unit)) {
if (cloudMonitoringUnitMappings.hasOwnProperty(targets[0].unit!)) {
// @ts-ignore
unit = cloudMonitoringUnitMappings[targets[0].unit];
}
}
return unit;
}
}
......@@ -213,51 +213,6 @@ describe('CloudMonitoringDataSource', () => {
});
});
});
describe('unit parsing', () => {
const { ds } = getTestcontext();
describe('when theres only one target', () => {
describe('and the cloud monitoring unit does nott have a corresponding grafana unit', () => {
it('should return undefined', () => {
const res = ds.resolvePanelUnitFromTargets([{ unit: 'megaseconds' }]);
expect(res).toBeUndefined();
});
});
describe('and the cloud monitoring unit has a corresponding grafana unit', () => {
it('should return bits', () => {
const res = ds.resolvePanelUnitFromTargets([{ unit: 'bit' }]);
expect(res).toEqual('bits');
});
});
});
describe('when theres more than one target', () => {
describe('and all target units are the same', () => {
it('should return bits', () => {
const res = ds.resolvePanelUnitFromTargets([{ unit: 'bit' }, { unit: 'bit' }]);
expect(res).toEqual('bits');
});
});
describe('and all target units are the same but does not have grafana mappings', () => {
it('should return the default value of undefined', () => {
const res = ds.resolvePanelUnitFromTargets([{ unit: 'megaseconds' }, { unit: 'megaseconds' }]);
expect(res).toBeUndefined();
});
});
describe('and all target units are not the same', () => {
it('should return the default value of undefined', () => {
const res = ds.resolvePanelUnitFromTargets([{ unit: 'bit' }, { unit: 'min' }]);
expect(res).toBeUndefined();
});
});
});
});
});
function initTemplateSrv(values: any, multi = false) {
......
......@@ -94,6 +94,8 @@ export interface CloudMonitoringQuery extends DataQuery {
queryType: QueryType;
metricQuery: MetricQuery;
sloQuery?: SLOQuery;
intervalMs: number;
type: string;
}
export interface CloudMonitoringOptions extends DataSourceJsonData {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment