Commit f9281742 by Hugo Häggmark Committed by GitHub

CloudMonitoring: Support request cancellation properly (#28847)

parent 294770f4
import { of } from 'rxjs';
import Api from './api';
import { backendSrv } from 'app/core/services/backend_srv'; // will use the version in __mocks__
import { SelectableValue } from '@grafana/data';
import { createFetchResponse } from 'test/helpers/createFetchResponse';
jest.mock('@grafana/runtime', () => ({
...((jest.requireActual('@grafana/runtime') as unknown) as object),
......@@ -12,58 +14,60 @@ const response = [
{ label: 'test2', value: 'test2' },
];
describe('api', () => {
const datasourceRequestMock = jest.spyOn(backendSrv, 'datasourceRequest');
beforeEach(() => {
datasourceRequestMock.mockImplementation((options: any) => {
const data = { [options.url.match(/([^\/]*)\/*$/)[1]]: response };
return Promise.resolve({ data, status: 200 });
});
type Args = { path?: string; options?: any; response?: any; cache?: any };
async function getTestContext({ path = 'some-resource', options = {}, response = {}, cache }: Args = {}) {
jest.clearAllMocks();
const fetchMock = jest.spyOn(backendSrv, 'fetch');
fetchMock.mockImplementation((options: any) => {
const data = { [options.url.match(/([^\/]*)\/*$/)[1]]: response };
return of(createFetchResponse(data));
});
const api = new Api('/cloudmonitoring/');
if (cache) {
api.cache[path] = cache;
}
const res = await api.get(path, options);
return { res, api, fetchMock };
}
describe('api', () => {
describe('when resource was cached', () => {
let api: Api;
let res: Array<SelectableValue<string>>;
beforeEach(async () => {
api = new Api('/cloudmonitoring/');
api.cache['some-resource'] = response;
res = await api.get('some-resource');
});
it('should return cached value and not load from source', async () => {
const path = 'some-resource';
const { res, api, fetchMock } = await getTestContext({ path, cache: response });
it('should return cached value and not load from source', () => {
expect(res).toEqual(response);
expect(api.cache['some-resource']).toEqual(response);
expect(datasourceRequestMock).not.toHaveBeenCalled();
expect(api.cache[path]).toEqual(response);
expect(fetchMock).not.toHaveBeenCalled();
});
});
describe('when resource was not cached', () => {
let api: Api;
let res: Array<SelectableValue<string>>;
beforeEach(async () => {
api = new Api('/cloudmonitoring/');
res = await api.get('some-resource');
});
it('should return from source and not from cache', async () => {
const path = 'some-resource';
const { res, api, fetchMock } = await getTestContext({ path, response });
it('should return cached value and not load from source', () => {
expect(res).toEqual(response);
expect(api.cache['some-resource']).toEqual(response);
expect(datasourceRequestMock).toHaveBeenCalled();
expect(api.cache[path]).toEqual(response);
expect(fetchMock).toHaveBeenCalled();
});
});
describe('when cache should be bypassed', () => {
let api: Api;
let res: Array<SelectableValue<string>>;
beforeEach(async () => {
api = new Api('/cloudmonitoring/');
api.cache['some-resource'] = response;
res = await api.get('some-resource', { useCache: false });
});
it('should return from source and not from cache', async () => {
const options = { useCache: false };
const path = 'some-resource';
const { res, fetchMock } = await getTestContext({ path, response, cache: response, options });
it('should return cached value and not load from source', () => {
expect(res).toEqual(response);
expect(datasourceRequestMock).toHaveBeenCalled();
expect(fetchMock).toHaveBeenCalled();
});
});
});
import appEvents from 'app/core/app_events';
import { CoreEvents } from 'app/types';
import { Observable, of } from 'rxjs';
import { catchError, map } from 'rxjs/operators';
import { SelectableValue } from '@grafana/data';
import { getBackendSrv } from '@grafana/runtime';
import { FetchResponse, getBackendSrv } from '@grafana/runtime';
import appEvents from 'app/core/app_events';
import { CoreEvents } from 'app/types';
import { formatCloudMonitoringError } from './functions';
import { MetricDescriptor } from './types';
export interface PostResponse {
results: Record<string, any>;
}
interface Options {
responseMap?: (res: any) => SelectableValue<string> | MetricDescriptor;
baseUrl?: string;
......@@ -25,48 +31,56 @@ export default class Api {
};
}
async get(path: string, options?: Options): Promise<Array<SelectableValue<string>> | MetricDescriptor[]> {
try {
const { useCache, responseMap, baseUrl } = { ...this.defaultOptions, ...options };
get(path: string, options?: Options): Promise<Array<SelectableValue<string>> | MetricDescriptor[]> {
const { useCache, responseMap, baseUrl } = { ...this.defaultOptions, ...options };
if (useCache && this.cache[path]) {
return this.cache[path];
}
if (useCache && this.cache[path]) {
return Promise.resolve(this.cache[path]);
}
const response = await getBackendSrv().datasourceRequest({
return getBackendSrv()
.fetch<Record<string, any>>({
url: baseUrl + path,
method: 'GET',
});
const responsePropName = path.match(/([^\/]*)\/*$/)![1];
let res = [];
if (response && response.data && response.data[responsePropName]) {
res = response.data[responsePropName].map(responseMap);
}
})
.pipe(
map(response => {
const responsePropName = path.match(/([^\/]*)\/*$/)![1];
let res = [];
if (response && response.data && response.data[responsePropName]) {
res = response.data[responsePropName].map(responseMap);
}
if (useCache) {
this.cache[path] = res;
}
if (useCache) {
this.cache[path] = res;
}
return res;
} catch (error) {
appEvents.emit(CoreEvents.dsRequestError, { error: { data: { error: formatCloudMonitoringError(error) } } });
return [];
}
return res;
}),
catchError(error => {
appEvents.emit(CoreEvents.dsRequestError, {
error: { data: { error: formatCloudMonitoringError(error) } },
});
return of([]);
})
)
.toPromise();
}
async post(data: { [key: string]: any }) {
return getBackendSrv().datasourceRequest({
post(data: Record<string, any>): Observable<FetchResponse<PostResponse>> {
return getBackendSrv().fetch<PostResponse>({
url: '/api/tsdb/query',
method: 'POST',
data,
});
}
async test(projectName: string) {
return getBackendSrv().datasourceRequest({
url: `${this.baseUrl}${projectName}/metricDescriptors`,
method: 'GET',
});
test(projectName: string) {
return getBackendSrv()
.fetch<any>({
url: `${this.baseUrl}${projectName}/metricDescriptors`,
method: 'GET',
})
.toPromise();
}
}
......@@ -14,8 +14,10 @@ import { getTimeSrv, TimeSrv } from 'app/features/dashboard/services/TimeSrv';
import { CloudMonitoringOptions, CloudMonitoringQuery, Filter, MetricDescriptor, QueryType } from './types';
import { cloudMonitoringUnitMappings } from './constants';
import API from './api';
import API, { PostResponse } from './api';
import { CloudMonitoringVariableSupport } from './variables';
import { catchError, map, mergeMap } from 'rxjs/operators';
import { from, Observable, of, throwError } from 'rxjs';
export default class CloudMonitoringDatasource extends DataSourceApi<CloudMonitoringQuery, CloudMonitoringOptions> {
api: API;
......@@ -37,45 +39,52 @@ export default class CloudMonitoringDatasource extends DataSourceApi<CloudMonito
return this.templateSrv.getVariables().map(v => `$${v.name}`);
}
async query(options: DataQueryRequest<CloudMonitoringQuery>): Promise<DataQueryResponseData> {
const result: DataQueryResponseData[] = [];
const data = await this.getTimeSeries(options);
if (data.results) {
Object.values(data.results).forEach((queryRes: any) => {
if (!queryRes.series) {
return;
query(options: DataQueryRequest<CloudMonitoringQuery>): Observable<DataQueryResponseData> {
return this.getTimeSeries(options).pipe(
map(data => {
if (!data.results) {
return { data: [] };
}
const unit = this.resolvePanelUnitFromTargets(options.targets);
queryRes.series.forEach((series: any) => {
let timeSerie: any = {
target: series.name,
datapoints: series.points,
refId: queryRes.refId,
meta: queryRes.meta,
};
if (unit) {
timeSerie = { ...timeSerie, unit };
const result: DataQueryResponseData[] = [];
const values = Object.values(data.results);
for (const queryRes of values) {
if (!queryRes.series) {
continue;
}
const df = toDataFrame(timeSerie);
for (const field of df.fields) {
if (queryRes.meta?.deepLink && queryRes.meta?.deepLink.length > 0) {
field.config.links = [
{
url: queryRes.meta?.deepLink,
title: 'View in Metrics Explorer',
targetBlank: true,
},
];
const unit = this.resolvePanelUnitFromTargets(options.targets);
for (const series of queryRes.series) {
let timeSerie: any = {
target: series.name,
datapoints: series.points,
refId: queryRes.refId,
meta: queryRes.meta,
};
if (unit) {
timeSerie = { ...timeSerie, unit };
}
const df = toDataFrame(timeSerie);
for (const field of df.fields) {
if (queryRes.meta?.deepLink && queryRes.meta?.deepLink.length > 0) {
field.config.links = [
{
url: queryRes.meta?.deepLink,
title: 'View in Metrics Explorer',
targetBlank: true,
},
];
}
}
result.push(df);
}
result.push(df);
});
});
return { data: result };
} else {
return { data: [] };
}
}
return { data: result };
})
);
}
async annotationQuery(options: any) {
......@@ -101,47 +110,57 @@ export default class CloudMonitoringDatasource extends DataSourceApi<CloudMonito
},
];
const { data } = await this.api.post({
from: options.range.from.valueOf().toString(),
to: options.range.to.valueOf().toString(),
queries,
});
const results = data.results['annotationQuery'].tables[0].rows.map((v: any) => {
return {
annotation: annotation,
time: Date.parse(v[0]),
title: v[1],
tags: [],
text: v[3],
} as any;
});
return results;
return this.api
.post({
from: options.range.from.valueOf().toString(),
to: options.range.to.valueOf().toString(),
queries,
})
.pipe(
map(({ data }) => {
const results = data.results['annotationQuery'].tables[0].rows.map((v: any) => {
return {
annotation: annotation,
time: Date.parse(v[0]),
title: v[1],
tags: [],
text: v[3],
} as any;
});
return results;
})
)
.toPromise();
}
async getTimeSeries(options: DataQueryRequest<CloudMonitoringQuery>) {
await this.ensureGCEDefaultProject();
getTimeSeries(options: DataQueryRequest<CloudMonitoringQuery>): Observable<PostResponse> {
const queries = options.targets
.map(this.migrateQuery)
.filter(this.shouldRunQuery)
.map(q => this.prepareTimeSeriesQuery(q, options.scopedVars))
.map(q => ({ ...q, intervalMs: options.intervalMs, type: 'timeSeriesQuery' }));
if (queries.length > 0) {
const { data } = await this.api.post({
from: options.range.from.valueOf().toString(),
to: options.range.to.valueOf().toString(),
queries,
});
return data;
} else {
return { results: [] };
if (!queries.length) {
return of({ results: [] });
}
return from(this.ensureGCEDefaultProject()).pipe(
mergeMap(() => {
return this.api.post({
from: options.range.from.valueOf().toString(),
to: options.range.to.valueOf().toString(),
queries,
});
}),
map(({ data }) => {
return data;
})
);
}
async getLabels(metricType: string, refId: string, projectName: string, groupBys?: string[]) {
const response = await this.getTimeSeries({
return this.getTimeSeries({
targets: [
{
refId,
......@@ -157,9 +176,14 @@ export default class CloudMonitoringDatasource extends DataSourceApi<CloudMonito
},
],
range: this.timeSrv.timeRange(),
} as DataQueryRequest<CloudMonitoringQuery>);
const result = response.results[refId];
return result && result.meta ? result.meta.labels : {};
} as DataQueryRequest<CloudMonitoringQuery>)
.pipe(
map(response => {
const result = response.results[refId];
return result && result.meta ? result.meta.labels : {};
})
)
.toPromise();
}
async testDatasource() {
......@@ -205,14 +229,17 @@ export default class CloudMonitoringDatasource extends DataSourceApi<CloudMonito
},
],
})
.then(({ data }) => {
return data && data.results && data.results.getGCEDefaultProject && data.results.getGCEDefaultProject.meta
? data.results.getGCEDefaultProject.meta.defaultProject
: '';
})
.catch(err => {
throw err.data.error;
});
.pipe(
map(({ data }) => {
return data && data.results && data.results.getGCEDefaultProject && data.results.getGCEDefaultProject.meta
? data.results.getGCEDefaultProject.meta.defaultProject
: '';
}),
catchError(err => {
return throwError(err.data.error);
})
)
.toPromise();
}
getDefaultProject(): string {
......@@ -272,7 +299,7 @@ export default class CloudMonitoringDatasource extends DataSourceApi<CloudMonito
});
}
async getProjects() {
getProjects() {
return this.api.get(`projects`, {
responseMap: ({ projectId, name }: { projectId: string; name: string }) => ({
value: projectId,
......
import { FetchResponse } from '@grafana/runtime';
export function createFetchResponse<T>(data: T): FetchResponse<T> {
return {
data,
status: 200,
url: 'http://localhost:3000/api/query',
config: { url: 'http://localhost:3000/api/query' },
type: 'basic',
statusText: 'Ok',
redirected: false,
headers: ({} as unknown) as Headers,
ok: true,
};
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment