Commit e7f56a74 by Ryan McKinley Committed by GitHub

Refactor: split PanelQueryRunner into runner and state (#16685)

* check for running

* split out panel state

* adding test file

* remove bad test
parent 178ce8ee
...@@ -4,7 +4,6 @@ import { Subject, Unsubscribable, PartialObserver } from 'rxjs'; ...@@ -4,7 +4,6 @@ import { Subject, Unsubscribable, PartialObserver } from 'rxjs';
// Services & Utils // Services & Utils
import { getDatasourceSrv } from 'app/features/plugins/datasource_srv'; import { getDatasourceSrv } from 'app/features/plugins/datasource_srv';
import { getBackendSrv } from 'app/core/services/backend_srv';
import kbn from 'app/core/utils/kbn'; import kbn from 'app/core/utils/kbn';
import templateSrv from 'app/features/templating/template_srv'; import templateSrv from 'app/features/templating/template_srv';
...@@ -19,11 +18,9 @@ import { ...@@ -19,11 +18,9 @@ import {
ScopedVars, ScopedVars,
DataQueryRequest, DataQueryRequest,
SeriesData, SeriesData,
DataQueryError,
toLegacyResponseData,
isSeriesData,
DataSourceApi, DataSourceApi,
} from '@grafana/ui'; } from '@grafana/ui';
import { PanelQueryState } from './PanelQueryState';
export interface QueryRunnerOptions<TQuery extends DataQuery = DataQuery> { export interface QueryRunnerOptions<TQuery extends DataQuery = DataQuery> {
datasource: string | DataSourceApi<TQuery>; datasource: string | DataSourceApi<TQuery>;
...@@ -55,13 +52,7 @@ function getNextRequestId() { ...@@ -55,13 +52,7 @@ function getNextRequestId() {
export class PanelQueryRunner { export class PanelQueryRunner {
private subject?: Subject<PanelData>; private subject?: Subject<PanelData>;
private sendSeries = false; private state = new PanelQueryState();
private sendLegacy = false;
private data = {
state: LoadingState.NotStarted,
series: [],
} as PanelData;
/** /**
* Listen for updates to the PanelData. If a query has already run for this panel, * Listen for updates to the PanelData. If a query has already run for this panel,
...@@ -73,18 +64,17 @@ export class PanelQueryRunner { ...@@ -73,18 +64,17 @@ export class PanelQueryRunner {
} }
if (format === PanelQueryRunnerFormat.legacy) { if (format === PanelQueryRunnerFormat.legacy) {
this.sendLegacy = true; this.state.sendLegacy = true;
} else if (format === PanelQueryRunnerFormat.both) { } else if (format === PanelQueryRunnerFormat.both) {
this.sendSeries = true; this.state.sendSeries = true;
this.sendLegacy = true; this.state.sendLegacy = true;
} else { } else {
this.sendSeries = true; this.state.sendSeries = true;
} }
// Send the last result // Send the last result
if (this.data.state !== LoadingState.NotStarted) { if (this.state.data.state !== LoadingState.NotStarted) {
// TODO: make sure it has legacy if necessary observer.next(this.state.getDataAfterCheckingFormats());
observer.next(this.data);
} }
return this.subject.subscribe(observer); return this.subject.subscribe(observer);
...@@ -95,6 +85,8 @@ export class PanelQueryRunner { ...@@ -95,6 +85,8 @@ export class PanelQueryRunner {
this.subject = new Subject(); this.subject = new Subject();
} }
const { state } = this;
const { const {
queries, queries,
timezone, timezone,
...@@ -120,7 +112,11 @@ export class PanelQueryRunner { ...@@ -120,7 +112,11 @@ export class PanelQueryRunner {
timeInfo, timeInfo,
interval: '', interval: '',
intervalMs: 0, intervalMs: 0,
targets: cloneDeep(queries), targets: cloneDeep(
queries.filter(q => {
return !q.hide; // Skip any hidden queries
})
),
maxDataPoints: maxDataPoints || widthPixels, maxDataPoints: maxDataPoints || widthPixels,
scopedVars: scopedVars || {}, scopedVars: scopedVars || {},
cacheTimeout, cacheTimeout,
...@@ -129,15 +125,6 @@ export class PanelQueryRunner { ...@@ -129,15 +125,6 @@ export class PanelQueryRunner {
// Deprecated // Deprecated
(request as any).rangeRaw = timeRange.raw; (request as any).rangeRaw = timeRange.raw;
if (!queries) {
return this.publishUpdate({
state: LoadingState.Done,
series: [], // Clear the data
legacy: [],
request,
});
}
let loadingStateTimeoutId = 0; let loadingStateTimeoutId = 0;
try { try {
...@@ -159,77 +146,40 @@ export class PanelQueryRunner { ...@@ -159,77 +146,40 @@ export class PanelQueryRunner {
request.interval = norm.interval; request.interval = norm.interval;
request.intervalMs = norm.intervalMs; request.intervalMs = norm.intervalMs;
// Check if we can reuse the already issued query
if (state.isRunning()) {
if (state.isSameQuery(ds, request)) {
// TODO? maybe cancel if it has run too long?
return state.getCurrentExecutor();
} else {
state.cancel('Query Changed while running');
}
}
// Send a loading status event on slower queries // Send a loading status event on slower queries
loadingStateTimeoutId = window.setTimeout(() => { loadingStateTimeoutId = window.setTimeout(() => {
this.publishUpdate({ state: LoadingState.Loading }); if (this.state.isRunning()) {
this.subject.next(this.state.data);
}
}, delayStateNotification || 500); }, delayStateNotification || 500);
const resp = await ds.query(request); const data = await state.execute(ds, request);
request.endTime = Date.now();
// Make sure we send something back -- called run() w/o subscribe!
if (!(this.sendSeries || this.sendLegacy)) {
this.sendSeries = true;
}
// Make sure the response is in a supported format
const series = this.sendSeries ? getProcessedSeriesData(resp.data) : [];
const legacy = this.sendLegacy
? resp.data.map(v => {
if (isSeriesData(v)) {
return toLegacyResponseData(v);
}
return v;
})
: undefined;
// Make sure the delayed loading state timeout is cleared // Clear the delayed loading state timeout
clearTimeout(loadingStateTimeoutId); clearTimeout(loadingStateTimeoutId);
// Publish the result // Broadcast results
return this.publishUpdate({ this.subject.next(data);
state: LoadingState.Done, return data;
series,
legacy,
request,
});
} catch (err) { } catch (err) {
const error = err as DataQueryError;
if (!error.message) {
let message = 'Query error';
if (error.message) {
message = error.message;
} else if (error.data && error.data.message) {
message = error.data.message;
} else if (error.data && error.data.error) {
message = error.data.error;
} else if (error.status) {
message = `Query error: ${error.status} ${error.statusText}`;
}
error.message = message;
}
// Make sure the delayed loading state timeout is cleared
clearTimeout(loadingStateTimeoutId); clearTimeout(loadingStateTimeoutId);
return this.publishUpdate({ const data = state.setError(err);
state: LoadingState.Error, this.subject.next(data);
error: error, return data;
});
} }
} }
publishUpdate(update: Partial<PanelData>): PanelData {
this.data = {
...this.data,
...update,
};
this.subject.next(this.data);
return this.data;
}
/** /**
* Called when the panel is closed * Called when the panel is closed
*/ */
...@@ -239,11 +189,8 @@ export class PanelQueryRunner { ...@@ -239,11 +189,8 @@ export class PanelQueryRunner {
this.subject.complete(); this.subject.complete();
} }
// If there are open HTTP requests, close them // Will cancel and disconnect any open requets
const { request } = this.data; this.state.cancel('destroy');
if (request && request.requestId) {
getBackendSrv().resolveCancelerIfExists(request.requestId);
}
} }
} }
......
import { toDataQueryError, PanelQueryState } from './PanelQueryState';
import { MockDataSourceApi } from 'test/mocks/datasource_srv';
import { DataQueryResponse } from '@grafana/ui';
import { getQueryOptions } from 'test/helpers/getQueryOptions';
describe('PanelQueryState', () => {
it('converts anythign to an error', () => {
let err = toDataQueryError(undefined);
expect(err.message).toEqual('Query error');
err = toDataQueryError('STRING ERRROR');
expect(err.message).toEqual('STRING ERRROR');
err = toDataQueryError({ message: 'hello' });
expect(err.message).toEqual('hello');
});
it('keeps track of running queries', async () => {
const state = new PanelQueryState();
expect(state.isRunning()).toBeFalsy();
let hasRun = false;
const dsRunner = new Promise<DataQueryResponse>((resolve, reject) => {
// The status should be running when we get here
expect(state.isRunning()).toBeTruthy();
resolve({ data: ['x', 'y'] });
hasRun = true;
});
const ds = new MockDataSourceApi('test');
ds.queryResolver = dsRunner;
// should not actually run for an empty query
let empty = await state.execute(ds, getQueryOptions({}));
expect(state.isRunning()).toBeFalsy();
expect(empty.series.length).toBe(0);
expect(hasRun).toBeFalsy();
empty = await state.execute(
ds,
getQueryOptions({ targets: [{ hide: true, refId: 'X' }, { hide: true, refId: 'Y' }, { hide: true, refId: 'Z' }] })
);
// should not run any hidden queries'
expect(state.isRunning()).toBeFalsy();
expect(empty.series.length).toBe(0);
expect(hasRun).toBeFalsy();
});
});
import {
DataSourceApi,
DataQueryRequest,
PanelData,
LoadingState,
toLegacyResponseData,
isSeriesData,
toSeriesData,
DataQueryError,
} from '@grafana/ui';
import { getProcessedSeriesData } from './PanelQueryRunner';
import { getBackendSrv } from 'app/core/services/backend_srv';
import isEqual from 'lodash/isEqual';
export class PanelQueryState {
// The current/last running request
request = {
startTime: 0,
endTime: 1000, // Somethign not zero
} as DataQueryRequest;
// The best known state of data
data = {
state: LoadingState.NotStarted,
series: [],
} as PanelData;
sendSeries = false;
sendLegacy = false;
// A promise for the running query
private executor: Promise<PanelData> = {} as any;
private rejector = (reason?: any) => {};
private datasource: DataSourceApi = {} as any;
isRunning() {
return this.data.state === LoadingState.Loading; //
}
isSameQuery(ds: DataSourceApi, req: DataQueryRequest) {
if (this.datasource !== this.datasource) {
return false;
}
// For now just check that the targets look the same
return isEqual(this.request.targets, req.targets);
}
getCurrentExecutor() {
return this.executor;
}
cancel(reason: string) {
const { request } = this;
try {
if (!request.endTime) {
request.endTime = Date.now();
this.rejector('Canceled:' + reason);
}
// Cancel any open HTTP request with the same ID
if (request.requestId) {
getBackendSrv().resolveCancelerIfExists(request.requestId);
}
} catch (err) {
console.log('Error canceling request');
}
}
execute(ds: DataSourceApi, req: DataQueryRequest): Promise<PanelData> {
this.request = req;
console.log('EXXXX', req);
// Return early if there are no queries to run
if (!req.targets.length) {
console.log('No queries, so return early');
this.request.endTime = Date.now();
return Promise.resolve(
(this.data = {
state: LoadingState.Done,
series: [], // Clear the data
legacy: [],
request: req,
})
);
}
// Set the loading state immediatly
this.data.state = LoadingState.Loading;
return (this.executor = new Promise<PanelData>((resolve, reject) => {
this.rejector = reject;
return ds
.query(this.request)
.then(resp => {
this.request.endTime = Date.now();
// Make sure we send something back -- called run() w/o subscribe!
if (!(this.sendSeries || this.sendLegacy)) {
this.sendSeries = true;
}
// Make sure the response is in a supported format
const series = this.sendSeries ? getProcessedSeriesData(resp.data) : [];
const legacy = this.sendLegacy
? resp.data.map(v => {
if (isSeriesData(v)) {
return toLegacyResponseData(v);
}
return v;
})
: undefined;
resolve(
(this.data = {
state: LoadingState.Done,
request: this.request,
series,
legacy,
})
);
})
.catch(err => {
resolve(this.setError(err));
});
}));
}
/**
* Make sure all requested formats exist on the data
*/
getDataAfterCheckingFormats(): PanelData {
const { data, sendLegacy, sendSeries } = this;
if (sendLegacy && (!data.legacy || !data.legacy.length)) {
data.legacy = data.series.map(v => toLegacyResponseData(v));
}
if (sendSeries && !data.series.length && data.legacy) {
data.series = data.legacy.map(v => toSeriesData(v));
}
return this.data;
}
setError(err: any): PanelData {
if (!this.request.endTime) {
this.request.endTime = Date.now();
}
return (this.data = {
...this.data, // Keep any existing data
state: LoadingState.Error,
error: toDataQueryError(err),
request: this.request,
});
}
}
export function toDataQueryError(err: any): DataQueryError {
const error = (err || {}) as DataQueryError;
if (!error.message) {
if (typeof err === 'string' || err instanceof String) {
return { message: err } as DataQueryError;
}
let message = 'Query error';
if (error.message) {
message = error.message;
} else if (error.data && error.data.message) {
message = error.data.message;
} else if (error.data && error.data.error) {
message = error.data.error;
} else if (error.status) {
message = `Query error: ${error.status} ${error.statusText}`;
}
error.message = message;
}
return error;
}
import { DataSourceApi, DataQueryRequest, DataQueryResponse } from '@grafana/ui';
export class DatasourceSrvMock {
constructor(private defaultDS: DataSourceApi, private datasources: { [name: string]: DataSourceApi }) {
//
}
get(name?: string): Promise<DataSourceApi> {
if (!name) {
return Promise.resolve(this.defaultDS);
}
const ds = this.datasources[name];
if (ds) {
return Promise.resolve(ds);
}
return Promise.reject('Unknown Datasource: ' + name);
}
}
export class MockDataSourceApi implements DataSourceApi {
name: string;
result: DataQueryResponse = { data: [] };
queryResolver: Promise<DataQueryResponse>;
constructor(DataQueryResponse, name?: string) {
this.name = name ? name : 'MockDataSourceApi';
}
query(request: DataQueryRequest): Promise<DataQueryResponse> {
if (this.queryResolver) {
return this.queryResolver;
}
return Promise.resolve(this.result);
}
testDatasource() {
return Promise.resolve();
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment