Commit b0946211 by Hugo Häggmark Committed by GitHub

OpenTSDB: Support request cancellation properly (#29992)

parent 6dfa9b48
import angular from 'angular';
import _ from 'lodash';
import { dateMath, DataQueryRequest, DataSourceApi, ScopedVars } from '@grafana/data';
import { getBackendSrv } from '@grafana/runtime';
import { Observable, of } from 'rxjs';
import { map } from 'rxjs/operators';
import { FetchResponse, getBackendSrv } from '@grafana/runtime';
import {
AnnotationEvent,
DataQueryRequest,
DataQueryResponse,
DataSourceApi,
dateMath,
ScopedVars,
} from '@grafana/data';
import { getTemplateSrv, TemplateSrv } from 'app/features/templating/template_srv';
import { OpenTsdbOptions, OpenTsdbQuery } from './types';
......@@ -37,7 +47,7 @@ export default class OpenTsDatasource extends DataSourceApi<OpenTsdbQuery, OpenT
}
// Called once per panel (graph)
query(options: DataQueryRequest<OpenTsdbQuery>) {
query(options: DataQueryRequest<OpenTsdbQuery>): Observable<DataQueryResponse> {
const start = this.convertToTSDBTime(options.range.raw.from, false, options.timezone);
const end = this.convertToTSDBTime(options.range.raw.to, true, options.timezone);
const qs: any[] = [];
......@@ -53,7 +63,7 @@ export default class OpenTsDatasource extends DataSourceApi<OpenTsdbQuery, OpenT
// No valid targets, return the empty result to save a round trip.
if (_.isEmpty(queries)) {
return Promise.resolve({ data: [] });
return of({ data: [] });
}
const groupByTags: any = {};
......@@ -73,22 +83,30 @@ export default class OpenTsDatasource extends DataSourceApi<OpenTsdbQuery, OpenT
return query.hide !== true;
});
return this.performTimeSeriesQuery(queries, start, end).then((response: any) => {
const metricToTargetMapping = this.mapMetricsToTargets(response.data, options, this.tsdbVersion);
const result = _.map(response.data, (metricData: any, index: number) => {
index = metricToTargetMapping[index];
if (index === -1) {
index = 0;
}
this._saveTagKeys(metricData);
return this.transformMetricData(metricData, groupByTags, options.targets[index], options, this.tsdbResolution);
});
return { data: result };
});
return this.performTimeSeriesQuery(queries, start, end).pipe(
map(response => {
const metricToTargetMapping = this.mapMetricsToTargets(response.data, options, this.tsdbVersion);
const result = _.map(response.data, (metricData: any, index: number) => {
index = metricToTargetMapping[index];
if (index === -1) {
index = 0;
}
this._saveTagKeys(metricData);
return this.transformMetricData(
metricData,
groupByTags,
options.targets[index],
options,
this.tsdbResolution
);
});
return { data: result };
})
);
}
annotationQuery(options: any) {
annotationQuery(options: any): Promise<AnnotationEvent[]> {
const start = this.convertToTSDBTime(options.rangeRaw.from, false, options.timezone);
const end = this.convertToTSDBTime(options.rangeRaw.to, true, options.timezone);
const qs = [];
......@@ -98,26 +116,30 @@ export default class OpenTsDatasource extends DataSourceApi<OpenTsdbQuery, OpenT
const queries = _.compact(qs);
return this.performTimeSeriesQuery(queries, start, end).then((results: any) => {
if (results.data[0]) {
let annotationObject = results.data[0].annotations;
if (options.annotation.isGlobal) {
annotationObject = results.data[0].globalAnnotations;
}
if (annotationObject) {
_.each(annotationObject, annotation => {
const event = {
text: annotation.description,
time: Math.floor(annotation.startTime) * 1000,
annotation: options.annotation,
};
eventList.push(event);
});
}
}
return eventList;
});
return this.performTimeSeriesQuery(queries, start, end)
.pipe(
map(results => {
if (results.data[0]) {
let annotationObject = results.data[0].annotations;
if (options.annotation.isGlobal) {
annotationObject = results.data[0].globalAnnotations;
}
if (annotationObject) {
_.each(annotationObject, annotation => {
const event = {
text: annotation.description,
time: Math.floor(annotation.startTime) * 1000,
annotation: options.annotation,
};
eventList.push(event);
});
}
}
return eventList;
})
)
.toPromise();
}
targetContainsTemplate(target: any) {
......@@ -140,7 +162,7 @@ export default class OpenTsDatasource extends DataSourceApi<OpenTsdbQuery, OpenT
return false;
}
performTimeSeriesQuery(queries: any[], start: any, end: any) {
performTimeSeriesQuery(queries: any[], start: any, end: any): Observable<FetchResponse> {
let msResolution = false;
if (this.tsdbResolution === 2) {
msResolution = true;
......@@ -167,7 +189,7 @@ export default class OpenTsDatasource extends DataSourceApi<OpenTsdbQuery, OpenT
};
this._addCredentialOptions(options);
return getBackendSrv().datasourceRequest(options);
return getBackendSrv().fetch(options);
}
suggestTagKeys(metric: string | number) {
......@@ -183,15 +205,17 @@ export default class OpenTsDatasource extends DataSourceApi<OpenTsdbQuery, OpenT
this.tagKeys[metricData.metric] = tagKeys;
}
_performSuggestQuery(query: string, type: string) {
return this._get('/api/suggest', { type, q: query, max: this.lookupLimit }).then((result: any) => {
return result.data;
});
_performSuggestQuery(query: string, type: string): Observable<any> {
return this._get('/api/suggest', { type, q: query, max: this.lookupLimit }).pipe(
map((result: any) => {
return result.data;
})
);
}
_performMetricKeyValueLookup(metric: string, keys: any) {
_performMetricKeyValueLookup(metric: string, keys: any): Observable<any[]> {
if (!metric || !keys) {
return Promise.resolve([]);
return of([]);
}
const keysArray = keys.split(',').map((key: any) => {
......@@ -206,38 +230,45 @@ export default class OpenTsDatasource extends DataSourceApi<OpenTsdbQuery, OpenT
const m = metric + '{' + keysQuery + '}';
return this._get('/api/search/lookup', { m: m, limit: this.lookupLimit }).then((result: any) => {
result = result.data.results;
const tagvs: any[] = [];
_.each(result, r => {
if (tagvs.indexOf(r.tags[key]) === -1) {
tagvs.push(r.tags[key]);
}
});
return tagvs;
});
return this._get('/api/search/lookup', { m: m, limit: this.lookupLimit }).pipe(
map((result: any) => {
result = result.data.results;
const tagvs: any[] = [];
_.each(result, r => {
if (tagvs.indexOf(r.tags[key]) === -1) {
tagvs.push(r.tags[key]);
}
});
return tagvs;
})
);
}
_performMetricKeyLookup(metric: any) {
_performMetricKeyLookup(metric: any): Observable<any[]> {
if (!metric) {
return Promise.resolve([]);
return of([]);
}
return this._get('/api/search/lookup', { m: metric, limit: 1000 }).then((result: any) => {
result = result.data.results;
const tagks: any[] = [];
_.each(result, r => {
_.each(r.tags, (tagv, tagk) => {
if (tagks.indexOf(tagk) === -1) {
tagks.push(tagk);
}
return this._get('/api/search/lookup', { m: metric, limit: 1000 }).pipe(
map((result: any) => {
result = result.data.results;
const tagks: any[] = [];
_.each(result, r => {
_.each(r.tags, (tagv, tagk) => {
if (tagks.indexOf(tagk) === -1) {
tagks.push(tagk);
}
});
});
});
return tagks;
});
return tagks;
})
);
}
_get(relativeUrl: string, params?: { type?: string; q?: string; max?: number; m?: any; limit?: number }) {
_get(
relativeUrl: string,
params?: { type?: string; q?: string; max?: number; m?: any; limit?: number }
): Observable<FetchResponse> {
const options = {
method: 'GET',
url: this.url + relativeUrl,
......@@ -246,7 +277,7 @@ export default class OpenTsDatasource extends DataSourceApi<OpenTsdbQuery, OpenT
this._addCredentialOptions(options);
return getBackendSrv().datasourceRequest(options);
return getBackendSrv().fetch(options);
}
_addCredentialOptions(options: any) {
......@@ -284,36 +315,50 @@ export default class OpenTsDatasource extends DataSourceApi<OpenTsdbQuery, OpenT
const metricsQuery = interpolated.match(metricsRegex);
if (metricsQuery) {
return this._performSuggestQuery(metricsQuery[1], 'metrics').then(responseTransform);
return this._performSuggestQuery(metricsQuery[1], 'metrics')
.pipe(map(responseTransform))
.toPromise();
}
const tagNamesQuery = interpolated.match(tagNamesRegex);
if (tagNamesQuery) {
return this._performMetricKeyLookup(tagNamesQuery[1]).then(responseTransform);
return this._performMetricKeyLookup(tagNamesQuery[1])
.pipe(map(responseTransform))
.toPromise();
}
const tagValuesQuery = interpolated.match(tagValuesRegex);
if (tagValuesQuery) {
return this._performMetricKeyValueLookup(tagValuesQuery[1], tagValuesQuery[2]).then(responseTransform);
return this._performMetricKeyValueLookup(tagValuesQuery[1], tagValuesQuery[2])
.pipe(map(responseTransform))
.toPromise();
}
const tagNamesSuggestQuery = interpolated.match(tagNamesSuggestRegex);
if (tagNamesSuggestQuery) {
return this._performSuggestQuery(tagNamesSuggestQuery[1], 'tagk').then(responseTransform);
return this._performSuggestQuery(tagNamesSuggestQuery[1], 'tagk')
.pipe(map(responseTransform))
.toPromise();
}
const tagValuesSuggestQuery = interpolated.match(tagValuesSuggestRegex);
if (tagValuesSuggestQuery) {
return this._performSuggestQuery(tagValuesSuggestQuery[1], 'tagv').then(responseTransform);
return this._performSuggestQuery(tagValuesSuggestQuery[1], 'tagv')
.pipe(map(responseTransform))
.toPromise();
}
return Promise.resolve([]);
}
testDatasource() {
return this._performSuggestQuery('cpu', 'metrics').then(() => {
return { status: 'success', message: 'Data source is working' };
});
return this._performSuggestQuery('cpu', 'metrics')
.pipe(
map(() => {
return { status: 'success', message: 'Data source is working' };
})
)
.toPromise();
}
getAggregators() {
......@@ -321,12 +366,16 @@ export default class OpenTsDatasource extends DataSourceApi<OpenTsdbQuery, OpenT
return this.aggregatorsPromise;
}
this.aggregatorsPromise = this._get('/api/aggregators').then((result: any) => {
if (result.data && _.isArray(result.data)) {
return result.data.sort();
}
return [];
});
this.aggregatorsPromise = this._get('/api/aggregators')
.pipe(
map((result: any) => {
if (result.data && _.isArray(result.data)) {
return result.data.sort();
}
return [];
})
)
.toPromise();
return this.aggregatorsPromise;
}
......@@ -335,12 +384,16 @@ export default class OpenTsDatasource extends DataSourceApi<OpenTsdbQuery, OpenT
return this.filterTypesPromise;
}
this.filterTypesPromise = this._get('/api/config/filters').then((result: any) => {
if (result.data) {
return Object.keys(result.data).sort();
}
return [];
});
this.filterTypesPromise = this._get('/api/config/filters')
.pipe(
map((result: any) => {
if (result.data) {
return Object.keys(result.data).sort();
}
return [];
})
)
.toPromise();
return this.filterTypesPromise;
}
......
import OpenTsDatasource from '../datasource';
import { backendSrv } from 'app/core/services/backend_srv'; // will use the version in __mocks__
import { OpenTsdbQuery } from '../types';
import { createFetchResponse } from '../../../../../test/helpers/createFetchResponse';
import { of } from 'rxjs';
jest.mock('@grafana/runtime', () => ({
...((jest.requireActual('@grafana/runtime') as unknown) as object),
getBackendSrv: () => backendSrv,
}));
describe('opentsdb', () => {
const datasourceRequestMock = jest.spyOn(backendSrv, 'datasourceRequest');
const metricFindQueryData = [
{
target: 'prod1.count',
datapoints: [
[10, 1],
[12, 1],
],
},
];
beforeEach(() => {
describe('opentsdb', () => {
function getTestcontext({ data = metricFindQueryData }: { data?: any } = {}) {
jest.clearAllMocks();
});
const fetchMock = jest.spyOn(backendSrv, 'fetch');
fetchMock.mockImplementation(() => of(createFetchResponse(data)));
const ctx = {
ds: {},
templateSrv: {
replace: (str: string) => str,
},
} as any;
const instanceSettings = { url: '', jsonData: { tsdbVersion: 1 } };
const instanceSettings = { url: '', jsonData: { tsdbVersion: 1 } };
const replace = jest.fn(value => value);
const templateSrv: any = {
replace,
};
beforeEach(() => {
ctx.ctrl = new OpenTsDatasource(instanceSettings, ctx.templateSrv);
});
const ds = new OpenTsDatasource(instanceSettings, templateSrv);
return { ds, templateSrv, fetchMock };
}
describe('When performing metricFindQuery', () => {
let results: any;
let requestOptions: any;
beforeEach(async () => {
datasourceRequestMock.mockImplementation(
await ((options: any) => {
requestOptions = options;
return Promise.resolve({
data: [
{
target: 'prod1.count',
datapoints: [
[10, 1],
[12, 1],
],
},
],
});
})
);
});
it('metrics() should generate api suggest query', async () => {
const { ds, fetchMock } = getTestcontext();
const results = await ds.metricFindQuery('metrics(pew)');
it('metrics() should generate api suggest query', () => {
ctx.ctrl.metricFindQuery('metrics(pew)').then((data: any) => {
results = data;
});
expect(requestOptions.url).toBe('/api/suggest');
expect(requestOptions.params.type).toBe('metrics');
expect(requestOptions.params.q).toBe('pew');
expect(fetchMock).toHaveBeenCalledTimes(1);
expect(fetchMock.mock.calls[0][0].url).toBe('/api/suggest');
expect(fetchMock.mock.calls[0][0].params?.type).toBe('metrics');
expect(fetchMock.mock.calls[0][0].params?.q).toBe('pew');
expect(results).not.toBe(null);
});
it('tag_names(cpu) should generate lookup query', () => {
ctx.ctrl.metricFindQuery('tag_names(cpu)').then((data: any) => {
results = data;
});
expect(requestOptions.url).toBe('/api/search/lookup');
expect(requestOptions.params.m).toBe('cpu');
});
it('tag_names(cpu) should generate lookup query', async () => {
const { ds, fetchMock } = getTestcontext();
const results = await ds.metricFindQuery('tag_names(cpu)');
it('tag_values(cpu, test) should generate lookup query', () => {
ctx.ctrl.metricFindQuery('tag_values(cpu, hostname)').then((data: any) => {
results = data;
});
expect(requestOptions.url).toBe('/api/search/lookup');
expect(requestOptions.params.m).toBe('cpu{hostname=*}');
expect(fetchMock).toHaveBeenCalledTimes(1);
expect(fetchMock.mock.calls[0][0].url).toBe('/api/search/lookup');
expect(fetchMock.mock.calls[0][0].params?.m).toBe('cpu');
expect(results).not.toBe(null);
});
it('tag_values(cpu, test) should generate lookup query', () => {
ctx.ctrl.metricFindQuery('tag_values(cpu, hostname, env=$env)').then((data: any) => {
results = data;
});
expect(requestOptions.url).toBe('/api/search/lookup');
expect(requestOptions.params.m).toBe('cpu{hostname=*,env=$env}');
it('tag_values(cpu, test) should generate lookup query', async () => {
const { ds, fetchMock } = getTestcontext();
const results = await ds.metricFindQuery('tag_values(cpu, hostname)');
expect(fetchMock).toHaveBeenCalledTimes(1);
expect(fetchMock.mock.calls[0][0].url).toBe('/api/search/lookup');
expect(fetchMock.mock.calls[0][0].params?.m).toBe('cpu{hostname=*}');
expect(results).not.toBe(null);
});
it('tag_values(cpu, test) should generate lookup query', () => {
ctx.ctrl.metricFindQuery('tag_values(cpu, hostname, env=$env, region=$region)').then((data: any) => {
results = data;
});
expect(requestOptions.url).toBe('/api/search/lookup');
expect(requestOptions.params.m).toBe('cpu{hostname=*,env=$env,region=$region}');
it('tag_values(cpu, test) should generate lookup query', async () => {
const { ds, fetchMock } = getTestcontext();
const results = await ds.metricFindQuery('tag_values(cpu, hostname, env=$env)');
expect(fetchMock).toHaveBeenCalledTimes(1);
expect(fetchMock.mock.calls[0][0].url).toBe('/api/search/lookup');
expect(fetchMock.mock.calls[0][0].params?.m).toBe('cpu{hostname=*,env=$env}');
expect(results).not.toBe(null);
});
it('suggest_tagk() should generate api suggest query', () => {
ctx.ctrl.metricFindQuery('suggest_tagk(foo)').then((data: any) => {
results = data;
});
expect(requestOptions.url).toBe('/api/suggest');
expect(requestOptions.params.type).toBe('tagk');
expect(requestOptions.params.q).toBe('foo');
it('tag_values(cpu, test) should generate lookup query', async () => {
const { ds, fetchMock } = getTestcontext();
const results = await ds.metricFindQuery('tag_values(cpu, hostname, env=$env, region=$region)');
expect(fetchMock).toHaveBeenCalledTimes(1);
expect(fetchMock.mock.calls[0][0].url).toBe('/api/search/lookup');
expect(fetchMock.mock.calls[0][0].params?.m).toBe('cpu{hostname=*,env=$env,region=$region}');
expect(results).not.toBe(null);
});
it('suggest_tagv() should generate api suggest query', () => {
ctx.ctrl.metricFindQuery('suggest_tagv(bar)').then((data: any) => {
results = data;
});
expect(requestOptions.url).toBe('/api/suggest');
expect(requestOptions.params.type).toBe('tagv');
expect(requestOptions.params.q).toBe('bar');
it('suggest_tagk() should generate api suggest query', async () => {
const { ds, fetchMock } = getTestcontext();
const results = await ds.metricFindQuery('suggest_tagk(foo)');
expect(fetchMock).toHaveBeenCalledTimes(1);
expect(fetchMock.mock.calls[0][0].url).toBe('/api/suggest');
expect(fetchMock.mock.calls[0][0].params?.type).toBe('tagk');
expect(fetchMock.mock.calls[0][0].params?.q).toBe('foo');
expect(results).not.toBe(null);
});
});
describe('When interpolating variables', () => {
beforeEach(() => {
jest.clearAllMocks();
it('suggest_tagv() should generate api suggest query', async () => {
const { ds, fetchMock } = getTestcontext();
ctx.mockedTemplateSrv = {
replace: jest.fn(),
};
const results = await ds.metricFindQuery('suggest_tagv(bar)');
ctx.ds = new OpenTsDatasource(instanceSettings, ctx.mockedTemplateSrv);
expect(fetchMock).toHaveBeenCalledTimes(1);
expect(fetchMock.mock.calls[0][0].url).toBe('/api/suggest');
expect(fetchMock.mock.calls[0][0].params?.type).toBe('tagv');
expect(fetchMock.mock.calls[0][0].params?.q).toBe('bar');
expect(results).not.toBe(null);
});
});
describe('When interpolating variables', () => {
it('should return an empty array if no queries are provided', () => {
expect(ctx.ds.interpolateVariablesInQueries([], {})).toHaveLength(0);
const { ds } = getTestcontext();
expect(ds.interpolateVariablesInQueries([], {})).toHaveLength(0);
});
it('should replace correct variables', () => {
const { ds, templateSrv } = getTestcontext();
const variableName = 'someVar';
const logQuery: OpenTsdbQuery = {
refId: 'someRefId',
metric: `$${variableName}`,
};
ctx.ds.interpolateVariablesInQueries([logQuery], {});
ds.interpolateVariablesInQueries([logQuery], {});
expect(ctx.mockedTemplateSrv.replace).toHaveBeenCalledWith(`$${variableName}`, {});
expect(ctx.mockedTemplateSrv.replace).toHaveBeenCalledTimes(1);
expect(templateSrv.replace).toHaveBeenCalledWith('$someVar', {});
expect(templateSrv.replace).toHaveBeenCalledTimes(1);
});
});
});
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment