Commit 5ca643f2 by Hugo Häggmark Committed by Torkel Ödegaard

Explore: Use PanelQueryState to handle querying (#18694)

* WIP: inital POC

* Wip: Moving forward

* Wip

* Refactor: Makes loading indicator work for Prometheus

* Refactor: Reverts prom observable queries because they did not work for multiple targets

* Refactor: Transforms all epics into thunks

* Fix: Fixes scanning

* Fix: Fixes so that Instant and TimeSeries Prom query loads in parallel

* Fix: Fixes negation logic error

* Propagate errors in stream events, and close streams
parent f942fecc
......@@ -230,7 +230,6 @@
"react-window": "1.7.1",
"redux": "4.0.1",
"redux-logger": "3.0.6",
"redux-observable": "1.1.0",
"redux-thunk": "2.3.0",
"reselect": "4.0.0",
"rst2html": "github:thoward/rst2html#990cb89",
......
......@@ -2,14 +2,23 @@
import _ from 'lodash';
import { from } from 'rxjs';
import { isLive } from '@grafana/ui/src/components/RefreshPicker/RefreshPicker';
// Services & Utils
import { dateMath } from '@grafana/data';
import {
dateMath,
toUtc,
TimeRange,
RawTimeRange,
TimeZone,
IntervalValues,
TimeFragment,
LogRowModel,
LogsModel,
LogsDedupStrategy,
} from '@grafana/data';
import { renderUrl } from 'app/core/utils/url';
import kbn from 'app/core/utils/kbn';
import store from 'app/core/store';
import { getNextRefIdChar } from './query';
// Types
import {
DataQuery,
......@@ -20,17 +29,6 @@ import {
DataStreamObserver,
} from '@grafana/ui';
import {
toUtc,
TimeRange,
RawTimeRange,
TimeZone,
IntervalValues,
TimeFragment,
LogRowModel,
LogsModel,
LogsDedupStrategy,
} from '@grafana/data';
import {
ExploreUrlState,
HistoryItem,
QueryTransaction,
......@@ -39,6 +37,7 @@ import {
ExploreMode,
} from 'app/types/explore';
import { config } from '../config';
import { PanelQueryState } from '../../features/dashboard/state/PanelQueryState';
export const DEFAULT_RANGE = {
from: 'now-1h',
......@@ -145,6 +144,7 @@ export function buildQueryTransaction(
panelId,
targets: configuredQueries, // Datasources rely on DataQueries being passed under the targets key.
range,
requestId: 'explore',
rangeRaw: range.raw,
scopedVars: {
__interval: { text: interval, value: interval },
......@@ -542,3 +542,10 @@ export const getQueryResponse = (
) => {
return from(datasourceInstance.query(options, observer));
};
export const stopQueryState = (queryState: PanelQueryState, reason: string) => {
if (queryState && queryState.isStarted()) {
queryState.cancel(reason);
queryState.closeStreams(false);
}
};
......@@ -207,4 +207,19 @@ describe('stream handling', () => {
expect(data.series[0].refId).toBe('F');
expect(state.streams.length).toBe(0); // no streams
});
it('should close streams on error', () => {
// Post a stream event
state.dataStreamObserver({
state: LoadingState.Error,
key: 'C',
error: { message: 'EEEEE' },
data: [],
request: state.request,
unsubscribe: () => {},
});
expect(state.streams.length).toBe(0);
expect(state.response.state).toBe(LoadingState.Error);
});
});
// Libraries
import { isArray, isEqual, isString } from 'lodash';
// Utils & Services
import { getBackendSrv } from 'app/core/services/backend_srv';
import { dateMath } from '@grafana/data';
import {
dateMath,
guessFieldTypes,
LoadingState,
toLegacyResponseData,
......@@ -12,7 +11,6 @@ import {
toDataFrame,
isDataFrame,
} from '@grafana/data';
// Types
import {
DataSourceApi,
......@@ -161,6 +159,12 @@ export class PanelQueryState {
// Streams only work with the 'series' format
this.sendFrames = true;
if (stream.state === LoadingState.Error) {
this.setError(stream.error);
this.onStreamingDataUpdated();
return;
}
// Add the stream to our list
let found = false;
const active = this.streams.map(s => {
......
......@@ -8,6 +8,7 @@ import { Editor } from 'slate-react';
// @ts-ignore
import Plain from 'slate-plain-serializer';
import classnames from 'classnames';
// @ts-ignore
import { isKeyHotkey } from 'is-hotkey';
import { CompletionItem, CompletionItemGroup, TypeaheadOutput } from 'app/types/explore';
......
// Types
import { Emitter } from 'app/core/core';
import {
DataQuery,
DataSourceSelectItem,
DataSourceApi,
QueryFixAction,
DataQueryError,
DataQueryResponseData,
} from '@grafana/ui';
import {
RawTimeRange,
LogLevel,
TimeRange,
DataFrame,
LogsModel,
LoadingState,
AbsoluteTimeRange,
GraphSeriesXY,
} from '@grafana/data';
import { ExploreId, ExploreItemState, HistoryItem, ExploreUIState, ExploreMode, QueryOptions } from 'app/types/explore';
import { DataQuery, DataSourceSelectItem, DataSourceApi, QueryFixAction, DataQueryError } from '@grafana/ui';
import { LogLevel, TimeRange, LogsModel, LoadingState, AbsoluteTimeRange, GraphSeriesXY } from '@grafana/data';
import { ExploreId, ExploreItemState, HistoryItem, ExploreUIState, ExploreMode } from 'app/types/explore';
import { actionCreatorFactory, noPayloadActionCreatorFactory, ActionOf } from 'app/core/redux/actionCreatorFactory';
import TableModel from 'app/core/table_model';
......@@ -230,42 +214,15 @@ export interface SetUrlReplacedPayload {
exploreId: ExploreId;
}
export interface ProcessQueryErrorsPayload {
exploreId: ExploreId;
response: any;
datasourceId: string;
}
export interface ProcessQueryResultsPayload {
exploreId: ExploreId;
latency: number;
datasourceId: string;
loadingState: LoadingState;
series?: DataQueryResponseData[];
delta?: DataFrame[];
}
export interface RunQueriesBatchPayload {
exploreId: ExploreId;
queryOptions: QueryOptions;
}
export interface LimitMessageRatePayload {
series: DataFrame[];
exploreId: ExploreId;
datasourceId: string;
}
export interface ChangeRangePayload {
exploreId: ExploreId;
range: TimeRange;
absoluteRange: AbsoluteTimeRange;
}
export interface UpdateTimeRangePayload {
export interface ChangeLoadingStatePayload {
exploreId: ExploreId;
rawRange?: RawTimeRange;
absoluteRange?: AbsoluteTimeRange;
loadingState: LoadingState;
}
/**
......@@ -410,8 +367,6 @@ export const splitCloseAction = actionCreatorFactory<SplitCloseActionPayload>('e
*/
export const splitOpenAction = actionCreatorFactory<SplitOpenPayload>('explore/SPLIT_OPEN').create();
export const stateSaveAction = noPayloadActionCreatorFactory('explore/STATE_SAVE').create();
/**
* Update state of Explores UI elements (panels visiblity and deduplication strategy)
*/
......@@ -460,23 +415,11 @@ export const resetQueryErrorAction = actionCreatorFactory<ResetQueryErrorPayload
export const setUrlReplacedAction = actionCreatorFactory<SetUrlReplacedPayload>('explore/SET_URL_REPLACED').create();
export const processQueryErrorsAction = actionCreatorFactory<ProcessQueryErrorsPayload>(
'explore/PROCESS_QUERY_ERRORS'
).create();
export const processQueryResultsAction = actionCreatorFactory<ProcessQueryResultsPayload>(
'explore/PROCESS_QUERY_RESULTS'
).create();
export const runQueriesBatchAction = actionCreatorFactory<RunQueriesBatchPayload>('explore/RUN_QUERIES_BATCH').create();
export const limitMessageRatePayloadAction = actionCreatorFactory<LimitMessageRatePayload>(
'explore/LIMIT_MESSAGE_RATE_PAYLOAD'
).create();
export const changeRangeAction = actionCreatorFactory<ChangeRangePayload>('explore/CHANGE_RANGE').create();
export const updateTimeRangeAction = actionCreatorFactory<UpdateTimeRangePayload>('explore/UPDATE_TIMERANGE').create();
export const changeLoadingStateAction = actionCreatorFactory<ChangeLoadingStatePayload>(
'changeLoadingStateAction'
).create();
export type HigherOrderAction =
| ActionOf<SplitCloseActionPayload>
......
......@@ -11,14 +11,12 @@ import {
testDataSourceFailureAction,
loadDatasourcePendingAction,
loadDatasourceReadyAction,
updateTimeRangeAction,
} from './actionTypes';
import { Emitter } from 'app/core/core';
import { ActionOf } from 'app/core/redux/actionCreatorFactory';
import { makeInitialUpdateState } from './reducers';
import { DataQuery } from '@grafana/ui/src/types/datasource';
import { DefaultTimeZone, RawTimeRange, LogsDedupStrategy } from '@grafana/data';
import { toUtc } from '@grafana/data';
import { DefaultTimeZone, RawTimeRange, LogsDedupStrategy, toUtc } from '@grafana/data';
jest.mock('app/features/plugins/datasource_srv', () => ({
getDatasourceSrv: () => ({
......@@ -30,6 +28,12 @@ jest.mock('app/features/plugins/datasource_srv', () => ({
}),
}));
jest.mock('../../dashboard/services/TimeSrv', () => ({
getTimeSrv: jest.fn().mockReturnValue({
init: jest.fn(),
}),
}));
const t = toUtc();
const testRange = {
from: t,
......@@ -62,6 +66,7 @@ const setup = (updateOverides?: Partial<ExploreUpdateState>) => {
const update = { ...updateDefaults, ...updateOverides };
const initialState = {
user: {
orgId: '1',
timeZone,
},
explore: {
......@@ -118,19 +123,6 @@ describe('refreshExplore', () => {
});
});
describe('and update range is set', () => {
it('then it should dispatch updateTimeRangeAction', async () => {
const { exploreId, range, initialState } = setup({ range: true });
const dispatchedActions = await thunkTester(initialState)
.givenThunk(refreshExplore)
.whenThunkIsDispatched(exploreId);
expect(dispatchedActions[0].type).toEqual(updateTimeRangeAction.type);
expect(dispatchedActions[0].payload).toEqual({ exploreId, rawRange: range.raw });
});
});
describe('and update ui is set', () => {
it('then it should dispatch updateUIStateAction', async () => {
const { exploreId, initialState, ui } = setup({ ui: true });
......
import { Epic } from 'redux-observable';
import { map, throttleTime } from 'rxjs/operators';
import { LoadingState } from '@grafana/data';
import { StoreState } from 'app/types';
import { ActionOf } from '../../../../core/redux/actionCreatorFactory';
import { limitMessageRatePayloadAction, LimitMessageRatePayload, processQueryResultsAction } from '../actionTypes';
import { EpicDependencies } from 'app/store/configureStore';
export const limitMessageRateEpic: Epic<ActionOf<any>, ActionOf<any>, StoreState, EpicDependencies> = action$ => {
return action$.ofType(limitMessageRatePayloadAction.type).pipe(
throttleTime(1),
map((action: ActionOf<LimitMessageRatePayload>) => {
const { exploreId, series, datasourceId } = action.payload;
return processQueryResultsAction({
exploreId,
latency: 0,
datasourceId,
loadingState: LoadingState.Streaming,
series: null,
delta: series,
});
})
);
};
import { mockExploreState } from 'test/mocks/mockExploreState';
import { epicTester } from 'test/core/redux/epicTester';
import { processQueryErrorsAction, queryFailureAction } from '../actionTypes';
import { processQueryErrorsEpic } from './processQueryErrorsEpic';
describe('processQueryErrorsEpic', () => {
let originalConsoleError = console.error;
beforeEach(() => {
originalConsoleError = console.error;
console.error = jest.fn();
});
afterEach(() => {
console.error = originalConsoleError;
});
describe('when processQueryErrorsAction is dispatched', () => {
describe('and datasourceInstance is the same', () => {
describe('and the response is not cancelled', () => {
it('then queryFailureAction is dispatched', () => {
const { datasourceId, exploreId, state, eventBridge } = mockExploreState();
const response = { message: 'Something went terribly wrong!' };
epicTester(processQueryErrorsEpic, state)
.whenActionIsDispatched(processQueryErrorsAction({ exploreId, datasourceId, response }))
.thenResultingActionsEqual(queryFailureAction({ exploreId, response }));
expect(console.error).toBeCalledTimes(1);
expect(console.error).toBeCalledWith(response);
expect(eventBridge.emit).toBeCalledTimes(1);
expect(eventBridge.emit).toBeCalledWith('data-error', response);
});
});
describe('and the response is cancelled', () => {
it('then no actions are dispatched', () => {
const { datasourceId, exploreId, state, eventBridge } = mockExploreState();
const response = { cancelled: true, message: 'Something went terribly wrong!' };
epicTester(processQueryErrorsEpic, state)
.whenActionIsDispatched(processQueryErrorsAction({ exploreId, datasourceId, response }))
.thenNoActionsWhereDispatched();
expect(console.error).not.toBeCalled();
expect(eventBridge.emit).not.toBeCalled();
});
});
});
describe('and datasourceInstance is not the same', () => {
describe('and the response is not cancelled', () => {
it('then no actions are dispatched', () => {
const { exploreId, state, eventBridge } = mockExploreState();
const response = { message: 'Something went terribly wrong!' };
epicTester(processQueryErrorsEpic, state)
.whenActionIsDispatched(processQueryErrorsAction({ exploreId, datasourceId: 'other id', response }))
.thenNoActionsWhereDispatched();
expect(console.error).not.toBeCalled();
expect(eventBridge.emit).not.toBeCalled();
});
});
});
});
});
import { Epic } from 'redux-observable';
import { mergeMap } from 'rxjs/operators';
import { NEVER, of } from 'rxjs';
import { ActionOf } from 'app/core/redux/actionCreatorFactory';
import { StoreState } from 'app/types/store';
import { instanceOfDataQueryError } from 'app/core/utils/explore';
import { toDataQueryError } from 'app/features/dashboard/state/PanelQueryState';
import { processQueryErrorsAction, ProcessQueryErrorsPayload, queryFailureAction } from '../actionTypes';
export const processQueryErrorsEpic: Epic<ActionOf<any>, ActionOf<any>, StoreState> = (action$, state$) => {
return action$.ofType(processQueryErrorsAction.type).pipe(
mergeMap((action: ActionOf<ProcessQueryErrorsPayload>) => {
const { exploreId, datasourceId } = action.payload;
let { response } = action.payload;
const { datasourceInstance, eventBridge } = state$.value.explore[exploreId];
if (datasourceInstance.meta.id !== datasourceId || response.cancelled) {
// Navigated away, queries did not matter
return NEVER;
}
// For Angular editors
eventBridge.emit('data-error', response);
console.error(response); // To help finding problems with query syntax
if (!instanceOfDataQueryError(response)) {
response = toDataQueryError(response);
}
return of(
queryFailureAction({
exploreId,
response,
})
);
})
);
};
import { mockExploreState } from 'test/mocks/mockExploreState';
import { epicTester, MOCKED_ABSOLUTE_RANGE } from 'test/core/redux/epicTester';
import {
processQueryResultsAction,
resetQueryErrorAction,
querySuccessAction,
scanStopAction,
updateTimeRangeAction,
runQueriesAction,
} from '../actionTypes';
import { DataFrame, LoadingState, toDataFrame } from '@grafana/data';
import { processQueryResultsEpic } from './processQueryResultsEpic';
import TableModel from 'app/core/table_model';
const testContext = () => {
const serieA: DataFrame = toDataFrame({
fields: [],
refId: 'A',
});
const serieB: DataFrame = toDataFrame({
fields: [],
refId: 'B',
});
const series = [serieA, serieB];
const latency = 0;
const loadingState = LoadingState.Done;
return {
latency,
series,
loadingState,
};
};
describe('processQueryResultsEpic', () => {
describe('when processQueryResultsAction is dispatched', () => {
describe('and datasourceInstance is the same', () => {
describe('and explore is not scanning', () => {
it('then resetQueryErrorAction and querySuccessAction are dispatched and eventBridge emits correct message', () => {
const { datasourceId, exploreId, state, eventBridge } = mockExploreState();
const { latency, series, loadingState } = testContext();
const graphResult: any[] = [];
const tableResult = new TableModel();
const logsResult: any = null;
epicTester(processQueryResultsEpic, state)
.whenActionIsDispatched(
processQueryResultsAction({ exploreId, datasourceId, loadingState, series, latency })
)
.thenResultingActionsEqual(
resetQueryErrorAction({ exploreId, refIds: ['A', 'B'] }),
querySuccessAction({ exploreId, loadingState, graphResult, tableResult, logsResult, latency })
);
expect(eventBridge.emit).toBeCalledTimes(1);
expect(eventBridge.emit).toBeCalledWith('data-received', series);
});
});
describe('and explore is scanning', () => {
describe('and we have a result', () => {
it('then correct actions are dispatched', () => {
const { datasourceId, exploreId, state } = mockExploreState({ scanning: true });
const { latency, series, loadingState } = testContext();
const graphResult: any[] = [];
const tableResult = new TableModel();
const logsResult: any = null;
epicTester(processQueryResultsEpic, state)
.whenActionIsDispatched(
processQueryResultsAction({ exploreId, datasourceId, loadingState, series, latency })
)
.thenResultingActionsEqual(
resetQueryErrorAction({ exploreId, refIds: ['A', 'B'] }),
querySuccessAction({ exploreId, loadingState, graphResult, tableResult, logsResult, latency }),
scanStopAction({ exploreId })
);
});
});
describe('and we do not have a result', () => {
it('then correct actions are dispatched', () => {
const { datasourceId, exploreId, state } = mockExploreState({ scanning: true });
const { latency, loadingState } = testContext();
const graphResult: any[] = [];
const tableResult = new TableModel();
const logsResult: any = null;
epicTester(processQueryResultsEpic, state)
.whenActionIsDispatched(
processQueryResultsAction({ exploreId, datasourceId, loadingState, series: [], latency })
)
.thenResultingActionsEqual(
resetQueryErrorAction({ exploreId, refIds: [] }),
querySuccessAction({ exploreId, loadingState, graphResult, tableResult, logsResult, latency }),
updateTimeRangeAction({ exploreId, absoluteRange: MOCKED_ABSOLUTE_RANGE }),
runQueriesAction({ exploreId })
);
});
});
});
});
describe('and datasourceInstance is not the same', () => {
it('then no actions are dispatched and eventBridge does not emit message', () => {
const { exploreId, state, eventBridge } = mockExploreState();
const { series, loadingState } = testContext();
epicTester(processQueryResultsEpic, state)
.whenActionIsDispatched(
processQueryResultsAction({ exploreId, datasourceId: 'other id', loadingState, series, latency: 0 })
)
.thenNoActionsWhereDispatched();
expect(eventBridge.emit).not.toBeCalled();
});
});
});
});
import _ from 'lodash';
import { Epic } from 'redux-observable';
import { mergeMap } from 'rxjs/operators';
import { NEVER } from 'rxjs';
import { LoadingState } from '@grafana/data';
import { ActionOf } from 'app/core/redux/actionCreatorFactory';
import { StoreState } from 'app/types/store';
import { getRefIds } from 'app/core/utils/explore';
import {
processQueryResultsAction,
ProcessQueryResultsPayload,
querySuccessAction,
resetQueryErrorAction,
scanStopAction,
updateTimeRangeAction,
runQueriesAction,
} from '../actionTypes';
import { ResultProcessor } from '../../utils/ResultProcessor';
export const processQueryResultsEpic: Epic<ActionOf<any>, ActionOf<any>, StoreState> = (
action$,
state$,
{ getTimeZone, getShiftedTimeRange }
) => {
return action$.ofType(processQueryResultsAction.type).pipe(
mergeMap((action: ActionOf<ProcessQueryResultsPayload>) => {
const { exploreId, datasourceId, latency, loadingState, series, delta } = action.payload;
const { datasourceInstance, scanning, eventBridge } = state$.value.explore[exploreId];
// If datasource already changed, results do not matter
if (datasourceInstance.meta.id !== datasourceId) {
return NEVER;
}
const result = series || delta || [];
const replacePreviousResults = loadingState === LoadingState.Done && series && !delta ? true : false;
const resultProcessor = new ResultProcessor(state$.value.explore[exploreId], replacePreviousResults, result);
const graphResult = resultProcessor.getGraphResult();
const tableResult = resultProcessor.getTableResult();
const logsResult = resultProcessor.getLogsResult();
const refIds = getRefIds(result);
const actions: Array<ActionOf<any>> = [];
// For Angular editors
eventBridge.emit('data-received', resultProcessor.getRawData());
// Clears any previous errors that now have a successful query, important so Angular editors are updated correctly
actions.push(
resetQueryErrorAction({
exploreId,
refIds,
})
);
actions.push(
querySuccessAction({
exploreId,
latency,
loadingState,
graphResult,
tableResult,
logsResult,
})
);
// Keep scanning for results if this was the last scanning transaction
if (scanning) {
if (_.size(result) === 0) {
const range = getShiftedTimeRange(-1, state$.value.explore[exploreId].range, getTimeZone(state$.value.user));
actions.push(updateTimeRangeAction({ exploreId, absoluteRange: range }));
actions.push(runQueriesAction({ exploreId }));
} else {
// We can stop scanning if we have a result
actions.push(scanStopAction({ exploreId }));
}
}
return actions;
})
);
};
import { Epic } from 'redux-observable';
import { Observable, Subject } from 'rxjs';
import { mergeMap, catchError, takeUntil, filter } from 'rxjs/operators';
import _, { isString } from 'lodash';
import { isLive } from '@grafana/ui/src/components/RefreshPicker/RefreshPicker';
import { DataStreamState, DataQueryResponse, DataQueryResponseData } from '@grafana/ui';
import { LoadingState, DataFrame, AbsoluteTimeRange } from '@grafana/data';
import { dateMath } from '@grafana/data';
import { ActionOf } from 'app/core/redux/actionCreatorFactory';
import { StoreState } from 'app/types/store';
import { buildQueryTransaction, updateHistory } from 'app/core/utils/explore';
import {
clearQueriesAction,
historyUpdatedAction,
resetExploreAction,
updateDatasourceInstanceAction,
changeRefreshIntervalAction,
processQueryErrorsAction,
processQueryResultsAction,
runQueriesBatchAction,
RunQueriesBatchPayload,
queryStartAction,
limitMessageRatePayloadAction,
stateSaveAction,
changeRangeAction,
} from '../actionTypes';
import { ExploreId, ExploreItemState } from 'app/types';
const publishActions = (outerObservable: Subject<any>, actions: Array<ActionOf<any>>) => {
for (const action of actions) {
outerObservable.next(action);
}
};
interface ProcessResponseConfig {
exploreId: ExploreId;
exploreItemState: ExploreItemState;
datasourceId: string;
now: number;
loadingState: LoadingState;
series?: DataQueryResponseData[];
delta?: DataFrame[];
}
const processResponse = (config: ProcessResponseConfig) => {
const { exploreId, exploreItemState, datasourceId, now, loadingState, series, delta } = config;
const { queries, history } = exploreItemState;
const latency = Date.now() - now;
// Side-effect: Saving history in localstorage
const nextHistory = updateHistory(history, datasourceId, queries);
return [
historyUpdatedAction({ exploreId, history: nextHistory }),
processQueryResultsAction({ exploreId, latency, datasourceId, loadingState, series, delta }),
stateSaveAction(),
];
};
interface ProcessErrorConfig {
exploreId: ExploreId;
datasourceId: string;
error: any;
}
const processError = (config: ProcessErrorConfig) => {
const { exploreId, datasourceId, error } = config;
return [processQueryErrorsAction({ exploreId, response: error, datasourceId })];
};
export const runQueriesBatchEpic: Epic<ActionOf<any>, ActionOf<any>, StoreState> = (
action$,
state$,
{ getQueryResponse }
) => {
return action$.ofType(runQueriesBatchAction.type).pipe(
mergeMap((action: ActionOf<RunQueriesBatchPayload>) => {
const { exploreId, queryOptions } = action.payload;
const exploreItemState = state$.value.explore[exploreId];
const { datasourceInstance, queries, queryIntervals, range, scanning } = exploreItemState;
// Create an observable per run queries action
// Within the observable create two subscriptions
// First subscription: 'querySubscription' subscribes to the call to query method on datasourceinstance
// Second subscription: 'streamSubscription' subscribes to events from the query methods observer callback
const observable: Observable<ActionOf<any>> = Observable.create((outerObservable: Subject<any>) => {
const datasourceId = datasourceInstance.meta.id;
const transaction = buildQueryTransaction(queries, queryOptions, range, queryIntervals, scanning);
outerObservable.next(queryStartAction({ exploreId }));
const now = Date.now();
let datasourceUnsubscribe: Function = null;
const streamHandler = new Subject<DataStreamState>();
const observer = (event: DataStreamState) => {
datasourceUnsubscribe = event.unsubscribe;
if (!streamHandler.closed) {
// their might be a race condition when unsubscribing
streamHandler.next(event);
}
};
// observer subscription, handles datasourceInstance.query observer events and pushes that forward
const streamSubscription = streamHandler.subscribe({
next: event => {
const { state, error, data, delta } = event;
if (!data && !delta && !error) {
return;
}
if (state === LoadingState.Error) {
const actions = processError({ exploreId, datasourceId, error });
publishActions(outerObservable, actions);
}
if (state === LoadingState.Streaming) {
if (event.request && event.request.range) {
let newRange = event.request.range;
let absoluteRange: AbsoluteTimeRange = {
from: newRange.from.valueOf(),
to: newRange.to.valueOf(),
};
if (isString(newRange.raw.from)) {
newRange = {
from: dateMath.parse(newRange.raw.from, false),
to: dateMath.parse(newRange.raw.to, true),
raw: newRange.raw,
};
absoluteRange = {
from: newRange.from.valueOf(),
to: newRange.to.valueOf(),
};
}
outerObservable.next(changeRangeAction({ exploreId, range: newRange, absoluteRange }));
}
outerObservable.next(
limitMessageRatePayloadAction({
exploreId,
series: delta,
datasourceId,
})
);
}
if (state === LoadingState.Done || state === LoadingState.Loading) {
const actions = processResponse({
exploreId,
exploreItemState,
datasourceId,
now,
loadingState: state,
series: null,
delta,
});
publishActions(outerObservable, actions);
}
},
});
// query subscription, handles datasourceInstance.query response and pushes that forward
const querySubscription = getQueryResponse(datasourceInstance, transaction.options, observer)
.pipe(
mergeMap((response: DataQueryResponse) => {
return processResponse({
exploreId,
exploreItemState,
datasourceId,
now,
loadingState: LoadingState.Done,
series: response && response.data ? response.data : [],
delta: null,
});
}),
catchError(error => {
return processError({ exploreId, datasourceId, error });
})
)
.subscribe({ next: (action: ActionOf<any>) => outerObservable.next(action) });
// this unsubscribe method will be called when any of the takeUntil actions below happen
const unsubscribe = () => {
if (datasourceUnsubscribe) {
datasourceUnsubscribe();
}
querySubscription.unsubscribe();
streamSubscription.unsubscribe();
streamHandler.unsubscribe();
outerObservable.unsubscribe();
};
return unsubscribe;
});
return observable.pipe(
takeUntil(
action$
.ofType(
runQueriesBatchAction.type,
resetExploreAction.type,
updateDatasourceInstanceAction.type,
changeRefreshIntervalAction.type,
clearQueriesAction.type
)
.pipe(
filter(action => {
if (action.type === resetExploreAction.type) {
return true; // stops all subscriptions if user navigates away
}
if (action.type === updateDatasourceInstanceAction.type && action.payload.exploreId === exploreId) {
return true; // stops subscriptions if user changes data source
}
if (action.type === changeRefreshIntervalAction.type && action.payload.exploreId === exploreId) {
return !isLive(action.payload.refreshInterval); // stops subscriptions if user changes refresh interval away from 'Live'
}
if (action.type === clearQueriesAction.type && action.payload.exploreId === exploreId) {
return true; // stops subscriptions if user clears all queries
}
return action.payload.exploreId === exploreId;
})
)
)
);
})
);
};
import { mockExploreState } from 'test/mocks/mockExploreState';
import { epicTester } from 'test/core/redux/epicTester';
import { runQueriesAction, stateSaveAction, runQueriesBatchAction, clearQueriesAction } from '../actionTypes';
import { runQueriesEpic } from './runQueriesEpic';
describe('runQueriesEpic', () => {
describe('when runQueriesAction is dispatched', () => {
describe('and there is no datasourceError', () => {
describe('and we have non empty queries', () => {
describe('and explore is not live', () => {
it('then runQueriesBatchAction and stateSaveAction are dispatched', () => {
const queries = [{ refId: 'A', key: '123456', expr: '{__filename__="some.log"}' }];
const { exploreId, state, datasourceInterval, containerWidth } = mockExploreState({ queries });
epicTester(runQueriesEpic, state)
.whenActionIsDispatched(runQueriesAction({ exploreId }))
.thenResultingActionsEqual(
runQueriesBatchAction({
exploreId,
queryOptions: { interval: datasourceInterval, maxDataPoints: containerWidth, live: false },
})
);
});
});
describe('and explore is live', () => {
it('then runQueriesBatchAction and stateSaveAction are dispatched', () => {
const queries = [{ refId: 'A', key: '123456', expr: '{__filename__="some.log"}' }];
const { exploreId, state, datasourceInterval, containerWidth } = mockExploreState({
queries,
isLive: true,
streaming: true,
});
epicTester(runQueriesEpic, state)
.whenActionIsDispatched(runQueriesAction({ exploreId }))
.thenResultingActionsEqual(
runQueriesBatchAction({
exploreId,
queryOptions: { interval: datasourceInterval, maxDataPoints: containerWidth, live: true },
})
);
});
});
});
describe('and we have no queries', () => {
it('then clearQueriesAction and stateSaveAction are dispatched', () => {
const queries: any[] = [];
const { exploreId, state } = mockExploreState({ queries });
epicTester(runQueriesEpic, state)
.whenActionIsDispatched(runQueriesAction({ exploreId }))
.thenResultingActionsEqual(clearQueriesAction({ exploreId }), stateSaveAction());
});
});
});
describe('and there is a datasourceError', () => {
it('then no actions are dispatched', () => {
const { exploreId, state } = mockExploreState({
datasourceError: { message: 'Some error' },
});
epicTester(runQueriesEpic, state)
.whenActionIsDispatched(runQueriesAction({ exploreId }))
.thenNoActionsWhereDispatched();
});
});
});
});
import { Epic } from 'redux-observable';
import { NEVER } from 'rxjs';
import { mergeMap } from 'rxjs/operators';
import { ActionOf } from 'app/core/redux/actionCreatorFactory';
import { StoreState } from 'app/types/store';
import { hasNonEmptyQuery } from 'app/core/utils/explore';
import {
clearQueriesAction,
runQueriesAction,
RunQueriesPayload,
runQueriesBatchAction,
stateSaveAction,
} from '../actionTypes';
export const runQueriesEpic: Epic<ActionOf<any>, ActionOf<any>, StoreState> = (action$, state$) => {
return action$.ofType(runQueriesAction.type).pipe(
mergeMap((action: ActionOf<RunQueriesPayload>) => {
const { exploreId } = action.payload;
const { datasourceInstance, queries, datasourceError, containerWidth, isLive } = state$.value.explore[exploreId];
if (datasourceError) {
// let's not run any queries if data source is in a faulty state
return NEVER;
}
if (!hasNonEmptyQuery(queries)) {
return [clearQueriesAction({ exploreId }), stateSaveAction()]; // Remember to save to state and update location
}
// Some datasource's query builders allow per-query interval limits,
// but we're using the datasource interval limit for now
const interval = datasourceInstance.interval;
const live = isLive;
return [runQueriesBatchAction({ exploreId, queryOptions: { interval, maxDataPoints: containerWidth, live } })];
})
);
};
import { epicTester } from 'test/core/redux/epicTester';
import { stateSaveEpic } from './stateSaveEpic';
import { stateSaveAction, setUrlReplacedAction } from '../actionTypes';
import { updateLocation } from 'app/core/actions/location';
import { mockExploreState } from 'test/mocks/mockExploreState';
describe('stateSaveEpic', () => {
describe('when stateSaveAction is dispatched', () => {
describe('and there is a left state', () => {
describe('and no split', () => {
it('then the correct actions are dispatched', () => {
const { exploreId, state } = mockExploreState();
epicTester(stateSaveEpic, state)
.whenActionIsDispatched(stateSaveAction())
.thenResultingActionsEqual(
updateLocation({
query: { orgId: '1', left: '["now-6h","now","test",{"mode":null},{"ui":[true,true,true,null]}]' },
replace: true,
}),
setUrlReplacedAction({ exploreId })
);
});
});
describe('and explore is split', () => {
it('then the correct actions are dispatched', () => {
const { exploreId, state } = mockExploreState({ split: true });
epicTester(stateSaveEpic, state)
.whenActionIsDispatched(stateSaveAction())
.thenResultingActionsEqual(
updateLocation({
query: {
orgId: '1',
left: '["now-6h","now","test",{"mode":null},{"ui":[true,true,true,null]}]',
right: '["now-6h","now","test",{"mode":null},{"ui":[true,true,true,null]}]',
},
replace: true,
}),
setUrlReplacedAction({ exploreId })
);
});
});
});
describe('and urlReplaced is true', () => {
it('then setUrlReplacedAction should not be dispatched', () => {
const { state } = mockExploreState({ urlReplaced: true });
epicTester(stateSaveEpic, state)
.whenActionIsDispatched(stateSaveAction())
.thenResultingActionsEqual(
updateLocation({
query: { orgId: '1', left: '["now-6h","now","test",{"mode":null},{"ui":[true,true,true,null]}]' },
replace: false,
})
);
});
});
});
});
import { Epic } from 'redux-observable';
import { mergeMap } from 'rxjs/operators';
import { RawTimeRange, TimeRange } from '@grafana/data';
import { isDateTime } from '@grafana/data';
import { ActionOf } from 'app/core/redux/actionCreatorFactory';
import { StoreState } from 'app/types/store';
import { ExploreUrlState, ExploreId } from 'app/types/explore';
import { clearQueryKeys, serializeStateToUrlParam } from 'app/core/utils/explore';
import { updateLocation } from 'app/core/actions/location';
import { setUrlReplacedAction, stateSaveAction } from '../actionTypes';
const toRawTimeRange = (range: TimeRange): RawTimeRange => {
let from = range.raw.from;
if (isDateTime(from)) {
from = from.valueOf().toString(10);
}
let to = range.raw.to;
if (isDateTime(to)) {
to = to.valueOf().toString(10);
}
return {
from,
to,
};
};
export const stateSaveEpic: Epic<ActionOf<any>, ActionOf<any>, StoreState> = (action$, state$) => {
return action$.ofType(stateSaveAction.type).pipe(
mergeMap(() => {
const { left, right, split } = state$.value.explore;
const orgId = state$.value.user.orgId.toString();
const replace = left && left.urlReplaced === false;
const urlStates: { [index: string]: string } = { orgId };
const leftUrlState: ExploreUrlState = {
datasource: left.datasourceInstance.name,
queries: left.queries.map(clearQueryKeys),
range: toRawTimeRange(left.range),
mode: left.mode,
ui: {
showingGraph: left.showingGraph,
showingLogs: true,
showingTable: left.showingTable,
dedupStrategy: left.dedupStrategy,
},
};
urlStates.left = serializeStateToUrlParam(leftUrlState, true);
if (split) {
const rightUrlState: ExploreUrlState = {
datasource: right.datasourceInstance.name,
queries: right.queries.map(clearQueryKeys),
range: toRawTimeRange(right.range),
mode: right.mode,
ui: {
showingGraph: right.showingGraph,
showingLogs: true,
showingTable: right.showingTable,
dedupStrategy: right.dedupStrategy,
},
};
urlStates.right = serializeStateToUrlParam(rightUrlState, true);
}
const actions: Array<ActionOf<any>> = [updateLocation({ query: urlStates, replace })];
if (replace) {
actions.push(setUrlReplacedAction({ exploreId: ExploreId.left }));
}
return actions;
})
);
};
import { dateTime, DefaultTimeZone } from '@grafana/data';
import { epicTester } from 'test/core/redux/epicTester';
import { mockExploreState } from 'test/mocks/mockExploreState';
import { timeEpic } from './timeEpic';
import { updateTimeRangeAction, changeRangeAction } from '../actionTypes';
import { EpicDependencies } from 'app/store/configureStore';
const from = dateTime('2019-01-01 10:00:00.000Z');
const to = dateTime('2019-01-01 16:00:00.000Z');
const rawFrom = 'now-6h';
const rawTo = 'now';
const rangeMock = {
from,
to,
raw: {
from: rawFrom,
to: rawTo,
},
};
describe('timeEpic', () => {
describe('when updateTimeRangeAction is dispatched', () => {
describe('and no rawRange is supplied', () => {
describe('and no absoluteRange is supplied', () => {
it('then the correct actions are dispatched', () => {
const { exploreId, state, range } = mockExploreState({ range: rangeMock });
const absoluteRange = { from: range.from.valueOf(), to: range.to.valueOf() };
const stateToTest = { ...state, user: { timeZone: 'browser', orgId: -1 } };
const getTimeRange = jest.fn().mockReturnValue(rangeMock);
const dependencies: Partial<EpicDependencies> = {
getTimeRange,
};
epicTester(timeEpic, stateToTest, dependencies)
.whenActionIsDispatched(updateTimeRangeAction({ exploreId }))
.thenDependencyWasCalledTimes(1, 'getTimeSrv', 'init')
.thenDependencyWasCalledTimes(1, 'getTimeRange')
.thenDependencyWasCalledWith([DefaultTimeZone, rangeMock.raw], 'getTimeRange')
.thenResultingActionsEqual(
changeRangeAction({
exploreId,
range,
absoluteRange,
})
);
});
});
describe('and absoluteRange is supplied', () => {
it('then the correct actions are dispatched', () => {
const { exploreId, state, range } = mockExploreState({ range: rangeMock });
const absoluteRange = { from: range.from.valueOf(), to: range.to.valueOf() };
const stateToTest = { ...state, user: { timeZone: 'browser', orgId: -1 } };
const getTimeRange = jest.fn().mockReturnValue(rangeMock);
const dependencies: Partial<EpicDependencies> = {
getTimeRange,
};
epicTester(timeEpic, stateToTest, dependencies)
.whenActionIsDispatched(updateTimeRangeAction({ exploreId, absoluteRange }))
.thenDependencyWasCalledTimes(1, 'getTimeSrv', 'init')
.thenDependencyWasCalledTimes(1, 'getTimeRange')
.thenDependencyWasCalledWith([DefaultTimeZone, { from: null, to: null }], 'getTimeRange')
.thenDependencyWasCalledTimes(2, 'dateTimeForTimeZone')
.thenResultingActionsEqual(
changeRangeAction({
exploreId,
range,
absoluteRange,
})
);
});
});
});
describe('and rawRange is supplied', () => {
describe('and no absoluteRange is supplied', () => {
it('then the correct actions are dispatched', () => {
const { exploreId, state, range } = mockExploreState({ range: rangeMock });
const rawRange = { from: 'now-5m', to: 'now' };
const absoluteRange = { from: range.from.valueOf(), to: range.to.valueOf() };
const stateToTest = { ...state, user: { timeZone: 'browser', orgId: -1 } };
const getTimeRange = jest.fn().mockReturnValue(rangeMock);
const dependencies: Partial<EpicDependencies> = {
getTimeRange,
};
epicTester(timeEpic, stateToTest, dependencies)
.whenActionIsDispatched(updateTimeRangeAction({ exploreId, rawRange }))
.thenDependencyWasCalledTimes(1, 'getTimeSrv', 'init')
.thenDependencyWasCalledTimes(1, 'getTimeRange')
.thenDependencyWasCalledWith([DefaultTimeZone, rawRange], 'getTimeRange')
.thenResultingActionsEqual(
changeRangeAction({
exploreId,
range,
absoluteRange,
})
);
});
});
});
});
});
import { Epic } from 'redux-observable';
import { map } from 'rxjs/operators';
import { AbsoluteTimeRange, RawTimeRange } from '@grafana/data';
import { ActionOf } from 'app/core/redux/actionCreatorFactory';
import { StoreState } from 'app/types/store';
import { updateTimeRangeAction, UpdateTimeRangePayload, changeRangeAction } from '../actionTypes';
import { EpicDependencies } from 'app/store/configureStore';
export const timeEpic: Epic<ActionOf<any>, ActionOf<any>, StoreState, EpicDependencies> = (
action$,
state$,
{ getTimeSrv, getTimeRange, getTimeZone, dateTimeForTimeZone }
) => {
return action$.ofType(updateTimeRangeAction.type).pipe(
map((action: ActionOf<UpdateTimeRangePayload>) => {
const { exploreId, absoluteRange: absRange, rawRange: actionRange } = action.payload;
const itemState = state$.value.explore[exploreId];
const timeZone = getTimeZone(state$.value.user);
const { range: rangeInState } = itemState;
let rawRange: RawTimeRange = rangeInState.raw;
if (absRange) {
rawRange = {
from: dateTimeForTimeZone(timeZone, absRange.from),
to: dateTimeForTimeZone(timeZone, absRange.to),
};
}
if (actionRange) {
rawRange = actionRange;
}
const range = getTimeRange(timeZone, rawRange);
const absoluteRange: AbsoluteTimeRange = { from: range.from.valueOf(), to: range.to.valueOf() };
getTimeSrv().init({
time: range.raw,
refresh: false,
getTimezone: () => timeZone,
timeRangeUpdated: (): any => undefined,
});
return changeRangeAction({ exploreId, range, absoluteRange });
})
);
};
......@@ -26,12 +26,14 @@ import { serializeStateToUrlParam } from 'app/core/utils/explore';
import TableModel from 'app/core/table_model';
import { DataSourceApi, DataQuery } from '@grafana/ui';
import { LogsModel, LogsDedupStrategy, LoadingState } from '@grafana/data';
import { PanelQueryState } from '../../dashboard/state/PanelQueryState';
describe('Explore item reducer', () => {
describe('scanning', () => {
it('should start scanning', () => {
const initalState = {
...makeExploreItemState(),
queryState: null as PanelQueryState,
scanning: false,
};
......@@ -40,12 +42,14 @@ describe('Explore item reducer', () => {
.whenActionIsDispatched(scanStartAction({ exploreId: ExploreId.left }))
.thenStateShouldEqual({
...makeExploreItemState(),
queryState: null as PanelQueryState,
scanning: true,
});
});
it('should stop scanning', () => {
const initalState = {
...makeExploreItemState(),
queryState: null as PanelQueryState,
scanning: true,
scanRange: {},
};
......@@ -55,6 +59,7 @@ describe('Explore item reducer', () => {
.whenActionIsDispatched(scanStopAction({ exploreId: ExploreId.left }))
.thenStateShouldEqual({
...makeExploreItemState(),
queryState: null as PanelQueryState,
scanning: false,
scanRange: undefined,
});
......
......@@ -7,6 +7,7 @@ import {
DEFAULT_UI_STATE,
generateNewKeyAndAddRefIdIfMissing,
sortLogsResult,
stopQueryState,
refreshIntervalToSortOrder,
} from 'app/core/utils/explore';
import { ExploreItemState, ExploreState, ExploreId, ExploreUpdateState, ExploreMode } from 'app/types/explore';
......@@ -31,9 +32,6 @@ import {
queryStartAction,
runQueriesAction,
changeRangeAction,
} from './actionTypes';
import { reducerFactory } from 'app/core/redux';
import {
addQueryRowAction,
changeQueryAction,
changeSizeAction,
......@@ -53,11 +51,15 @@ import {
queriesImportedAction,
updateUIStateAction,
toggleLogLevelAction,
changeLoadingStateAction,
resetExploreAction,
} from './actionTypes';
import { reducerFactory } from 'app/core/redux';
import { updateLocation } from 'app/core/actions/location';
import { LocationUpdate } from '@grafana/runtime';
import TableModel from 'app/core/table_model';
import { isLive } from '@grafana/ui/src/components/RefreshPicker/RefreshPicker';
import { PanelQueryState } from '../../dashboard/state/PanelQueryState';
export const DEFAULT_RANGE = {
from: 'now-6h',
......@@ -114,6 +116,7 @@ export const makeExploreItemState = (): ExploreItemState => ({
mode: null,
isLive: false,
urlReplaced: false,
queryState: new PanelQueryState(),
});
/**
......@@ -186,6 +189,9 @@ export const itemReducer = reducerFactory<ExploreItemState>({} as ExploreItemSta
const live = isLive(refreshInterval);
const sortOrder = refreshIntervalToSortOrder(refreshInterval);
const logsResult = sortLogsResult(state.logsResult, sortOrder);
if (isLive(state.refreshInterval) && !live) {
stopQueryState(state.queryState, 'Live streaming stopped');
}
return {
...state,
......@@ -200,6 +206,7 @@ export const itemReducer = reducerFactory<ExploreItemState>({} as ExploreItemSta
filter: clearQueriesAction,
mapper: (state): ExploreItemState => {
const queries = ensureQueries();
stopQueryState(state.queryState, 'Queries cleared');
return {
...state,
queries: queries.slice(),
......@@ -258,6 +265,7 @@ export const itemReducer = reducerFactory<ExploreItemState>({} as ExploreItemSta
// Custom components
const StartPage = datasourceInstance.components.ExploreStartPage;
stopQueryState(state.queryState, 'Datasource changed');
return {
...state,
......@@ -577,6 +585,16 @@ export const itemReducer = reducerFactory<ExploreItemState>({} as ExploreItemSta
};
},
})
.addMapper({
filter: changeLoadingStateAction,
mapper: (state, action): ExploreItemState => {
const { loadingState } = action.payload;
return {
...state,
loadingState,
};
},
})
.create();
export const updateChildRefreshState = (
......@@ -664,6 +682,19 @@ export const exploreReducer = (state = initialExploreState, action: HigherOrderA
[ExploreId.right]: updateChildRefreshState(rightState, action.payload, ExploreId.right),
};
}
case resetExploreAction.type: {
const leftState = state[ExploreId.left];
const rightState = state[ExploreId.right];
stopQueryState(leftState.queryState, 'Navigated away from Explore');
stopQueryState(rightState.queryState, 'Navigated away from Explore');
return {
...state,
[ExploreId.left]: updateChildRefreshState(leftState, action.payload, ExploreId.left),
[ExploreId.right]: updateChildRefreshState(rightState, action.payload, ExploreId.right),
};
}
}
if (action.payload) {
......
......@@ -3,14 +3,12 @@ import _ from 'lodash';
import { Subscription, of } from 'rxjs';
import { webSocket } from 'rxjs/webSocket';
import { catchError, map } from 'rxjs/operators';
// Services & Utils
import { dateMath } from '@grafana/data';
import { dateMath, DataFrame, LogRowModel, LoadingState, DateTime } from '@grafana/data';
import { addLabelToSelector } from 'app/plugins/datasource/prometheus/add_label_to_query';
import LanguageProvider from './language_provider';
import { logStreamToDataFrame } from './result_transformer';
import { formatQuery, parseQuery, getHighlighterExpressionsFromQuery } from './query_utils';
// Types
import {
PluginMeta,
......@@ -22,8 +20,6 @@ import {
DataStreamState,
DataQueryResponse,
} from '@grafana/ui';
import { DataFrame, LogRowModel, LoadingState, DateTime } from '@grafana/data';
import { LokiQuery, LokiOptions } from './types';
import { BackendSrv } from 'app/core/services/backend_srv';
import { TemplateSrv } from 'app/features/templating/template_srv';
......@@ -179,12 +175,12 @@ export class LokiDatasource extends DataSourceApi<LokiQuery, LokiOptions> {
const subscription = webSocket(liveTarget.url)
.pipe(
map((results: any[]) => {
const delta = this.processResult(results, liveTarget);
const data = this.processResult(results, liveTarget);
const state: DataStreamState = {
key: `loki-${liveTarget.refId}`,
request: options,
state: LoadingState.Streaming,
delta,
data,
unsubscribe: () => this.unsubscribe(liveTarget.refId),
};
......
// Libraries
import _ from 'lodash';
import $ from 'jquery';
import { from, of, Observable } from 'rxjs';
import { single, map, filter, catchError } from 'rxjs/operators';
// Services & Utils
import kbn from 'app/core/utils/kbn';
import { dateMath } from '@grafana/data';
import { dateMath, TimeRange, DateTime, AnnotationEvent, LoadingState } from '@grafana/data';
import { Observable, from, of } from 'rxjs';
import { single, filter, mergeMap, catchError } from 'rxjs/operators';
import PrometheusMetricFindQuery from './metric_find_query';
import { ResultTransformer } from './result_transformer';
import PrometheusLanguageProvider from './language_provider';
......@@ -14,7 +14,6 @@ import { BackendSrv } from 'app/core/services/backend_srv';
import addLabelToQuery from './add_label_to_query';
import { getQueryHints } from './query_hints';
import { expandRecordingRules } from './language_utils';
// Types
import { PromQuery, PromOptions, PromQueryRequest, PromContext } from './types';
import {
......@@ -23,14 +22,13 @@ import {
DataSourceInstanceSettings,
DataQueryError,
DataStreamObserver,
DataStreamState,
DataQueryResponseData,
DataStreamState,
} from '@grafana/ui';
import { ExploreUrlState } from 'app/types/explore';
import { safeStringifyValue } from 'app/core/utils/explore';
import { TemplateSrv } from 'app/features/templating/template_srv';
import { TimeSrv } from 'app/features/dashboard/services/TimeSrv';
import { TimeRange, DateTime, LoadingState, AnnotationEvent } from '@grafana/data';
export interface PromDataQueryResponse {
data: {
......@@ -183,6 +181,26 @@ export class PrometheusDatasource extends DataSourceApi<PromQuery, PromOptions>
activeTargets: PromQuery[],
end: number
) => {
// Because we want to get run instant and TimeSeries Prom queries in parallel but this isn't actually streaming
// we need to stop/cancel each posted event with a stop stream event (see below) to the observer so that the
// PanelQueryState stops the stream
const getStopState = (state: DataStreamState): DataStreamState => ({
...state,
state: LoadingState.Done,
request: { ...options, requestId: 'done' },
});
const startLoadingEvent: DataStreamState = {
key: `prometheus-loading_indicator`,
state: LoadingState.Loading,
request: options,
data: [],
unsubscribe: () => undefined,
};
observer(startLoadingEvent); // Starts the loading indicator
const lastTimeSeriesQuery = queries.filter(query => !query.instant).pop();
for (let index = 0; index < queries.length; index++) {
const query = queries[index];
const target = activeTargets[index];
......@@ -198,17 +216,23 @@ export class PrometheusDatasource extends DataSourceApi<PromQuery, PromOptions>
.pipe(
single(), // unsubscribes automatically after first result
filter((response: any) => (response.cancelled ? false : true)),
map((response: any) => {
const delta = this.processResult(response, query, target, queries.length);
mergeMap((response: any) => {
const data = this.processResult(response, query, target, queries.length);
const state: DataStreamState = {
key: `prometheus-${target.refId}`,
state: query.instant ? LoadingState.Loading : LoadingState.Done,
state: LoadingState.Loading,
request: options,
delta,
data,
unsubscribe: () => undefined,
};
return state;
const states = [state, getStopState(state)];
if (target.refId === lastTimeSeriesQuery.refId && target.expr === lastTimeSeriesQuery.expr) {
states.push(getStopState(startLoadingEvent)); // Stops the loading indicator
}
return states;
}),
catchError(err => {
const error = this.handleErrors(err, target);
......@@ -282,7 +306,6 @@ export class PrometheusDatasource extends DataSourceApi<PromQuery, PromOptions>
this.runObserverQueries(options, observer, queries, activeTargets, end);
return this.$q.when({ data: [] }) as Promise<{ data: any }>;
}
const allQueryPromise = _.map(queries, query => {
if (query.instant) {
return this.performInstantQuery(query, end);
......
import { createStore, applyMiddleware, compose, combineReducers } from 'redux';
import thunk from 'redux-thunk';
import { combineEpics, createEpicMiddleware } from 'redux-observable';
import { createLogger } from 'redux-logger';
import sharedReducers from 'app/core/reducers';
import alertingReducers from 'app/features/alerting/state/reducers';
......@@ -15,41 +14,8 @@ import usersReducers from 'app/features/users/state/reducers';
import userReducers from 'app/features/profile/state/reducers';
import organizationReducers from 'app/features/org/state/reducers';
import { setStore } from './store';
import { limitMessageRateEpic } from 'app/features/explore/state/epics/limitMessageRateEpic';
import { stateSaveEpic } from 'app/features/explore/state/epics/stateSaveEpic';
import { processQueryResultsEpic } from 'app/features/explore/state/epics/processQueryResultsEpic';
import { processQueryErrorsEpic } from 'app/features/explore/state/epics/processQueryErrorsEpic';
import { runQueriesEpic } from 'app/features/explore/state/epics/runQueriesEpic';
import { runQueriesBatchEpic } from 'app/features/explore/state/epics/runQueriesBatchEpic';
import {
DataSourceApi,
DataQueryResponse,
DataQuery,
DataSourceJsonData,
DataQueryRequest,
DataStreamObserver,
} from '@grafana/ui';
import {
TimeZone,
RawTimeRange,
TimeRange,
DateTimeInput,
FormatInput,
DateTime,
AbsoluteTimeRange,
dateTimeForTimeZone,
} from '@grafana/data';
import { Observable } from 'rxjs';
import { getQueryResponse } from 'app/core/utils/explore';
import { StoreState } from 'app/types/store';
import { toggleLogActionsMiddleware } from 'app/core/middlewares/application';
import { timeEpic } from 'app/features/explore/state/epics/timeEpic';
import { TimeSrv, getTimeSrv } from 'app/features/dashboard/services/TimeSrv';
import { UserState } from 'app/types/user';
import { getTimeRange } from 'app/core/utils/explore';
import { getTimeZone } from 'app/features/profile/state/selectors';
import { getShiftedTimeRange } from 'app/core/utils/timePicker';
const rootReducers = {
...sharedReducers,
......@@ -70,40 +36,6 @@ export function addRootReducer(reducers: any) {
Object.assign(rootReducers, ...reducers);
}
export const rootEpic: any = combineEpics(
limitMessageRateEpic,
stateSaveEpic,
runQueriesEpic,
runQueriesBatchEpic,
processQueryResultsEpic,
processQueryErrorsEpic,
timeEpic
);
export interface EpicDependencies {
getQueryResponse: (
datasourceInstance: DataSourceApi<DataQuery, DataSourceJsonData>,
options: DataQueryRequest<DataQuery>,
observer?: DataStreamObserver
) => Observable<DataQueryResponse>;
getTimeSrv: () => TimeSrv;
getTimeRange: (timeZone: TimeZone, rawRange: RawTimeRange) => TimeRange;
getTimeZone: (state: UserState) => TimeZone;
getShiftedTimeRange: (direction: number, origRange: TimeRange, timeZone: TimeZone) => AbsoluteTimeRange;
dateTimeForTimeZone: (timezone?: TimeZone, input?: DateTimeInput, formatInput?: FormatInput) => DateTime;
}
const dependencies: EpicDependencies = {
getQueryResponse,
getTimeSrv,
getTimeRange,
getTimeZone,
getShiftedTimeRange,
dateTimeForTimeZone,
};
const epicMiddleware = createEpicMiddleware({ dependencies });
export function configureStore() {
const composeEnhancers = (window as any).__REDUX_DEVTOOLS_EXTENSION_COMPOSE__ || compose;
const rootReducer = combineReducers(rootReducers);
......@@ -114,11 +46,10 @@ export function configureStore() {
});
const storeEnhancers =
process.env.NODE_ENV !== 'production'
? applyMiddleware(toggleLogActionsMiddleware, thunk, epicMiddleware, logger)
: applyMiddleware(thunk, epicMiddleware);
? applyMiddleware(toggleLogActionsMiddleware, thunk, logger)
: applyMiddleware(thunk);
const store = createStore(rootReducer, {}, composeEnhancers(storeEnhancers));
setStore(store);
epicMiddleware.run(rootEpic);
return store;
}
......@@ -21,6 +21,7 @@ import {
import { Emitter } from 'app/core/core';
import TableModel from 'app/core/table_model';
import { PanelQueryState } from '../features/dashboard/state/PanelQueryState';
export enum ExploreMode {
Metrics = 'Metrics',
......@@ -255,6 +256,8 @@ export interface ExploreItemState {
isLive: boolean;
urlReplaced: boolean;
queryState: PanelQueryState;
}
export interface ExploreUpdateState {
......
import { Epic, ActionsObservable, StateObservable } from 'redux-observable';
import { Subject } from 'rxjs';
import {
DataSourceApi,
DataQuery,
DataSourceJsonData,
DataQueryRequest,
DataStreamObserver,
DataQueryResponse,
DataStreamState,
} from '@grafana/ui';
import { DefaultTimeZone } from '@grafana/data';
import { ActionOf } from 'app/core/redux/actionCreatorFactory';
import { StoreState } from 'app/types/store';
import { EpicDependencies } from 'app/store/configureStore';
import { TimeSrv } from 'app/features/dashboard/services/TimeSrv';
import { DEFAULT_RANGE } from 'app/core/utils/explore';
export const MOCKED_ABSOLUTE_RANGE = { from: 1, to: 2 };
export const epicTester = (
epic: Epic<ActionOf<any>, ActionOf<any>, StoreState, EpicDependencies>,
state?: Partial<StoreState>,
dependencies?: Partial<EpicDependencies>
) => {
const resultingActions: Array<ActionOf<any>> = [];
const action$ = new Subject<ActionOf<any>>();
const state$ = new Subject<StoreState>();
const actionObservable$ = new ActionsObservable(action$);
const stateObservable$ = new StateObservable(state$, (state as StoreState) || ({} as StoreState));
const queryResponse$ = new Subject<DataQueryResponse>();
const observer$ = new Subject<DataStreamState>();
const getQueryResponse = (
datasourceInstance: DataSourceApi<DataQuery, DataSourceJsonData>,
options: DataQueryRequest<DataQuery>,
observer?: DataStreamObserver
) => {
if (observer) {
observer$.subscribe({ next: event => observer(event) });
}
return queryResponse$;
};
const init = jest.fn();
const getTimeSrv = (): TimeSrv => {
const timeSrvMock: TimeSrv = {} as TimeSrv;
return Object.assign(timeSrvMock, { init });
};
const getTimeRange = jest.fn().mockReturnValue(DEFAULT_RANGE);
const getShiftedTimeRange = jest.fn().mockReturnValue(MOCKED_ABSOLUTE_RANGE);
const getTimeZone = jest.fn().mockReturnValue(DefaultTimeZone);
const dateTimeForTimeZone = jest.fn().mockReturnValue(null);
const defaultDependencies: EpicDependencies = {
getQueryResponse,
getTimeSrv,
getTimeRange,
getTimeZone,
getShiftedTimeRange,
dateTimeForTimeZone,
};
const theDependencies: EpicDependencies = { ...defaultDependencies, ...dependencies };
epic(actionObservable$, stateObservable$, theDependencies).subscribe({
next: action => resultingActions.push(action),
});
const whenActionIsDispatched = (action: ActionOf<any>) => {
action$.next(action);
return instance;
};
const whenQueryReceivesResponse = (response: DataQueryResponse) => {
queryResponse$.next(response);
return instance;
};
const whenQueryThrowsError = (error: any) => {
queryResponse$.error(error);
return instance;
};
const whenQueryObserverReceivesEvent = (event: DataStreamState) => {
observer$.next(event);
return instance;
};
const thenResultingActionsEqual = (...actions: Array<ActionOf<any>>) => {
expect(actions).toEqual(resultingActions);
return instance;
};
const thenNoActionsWhereDispatched = () => {
expect(resultingActions).toEqual([]);
return instance;
};
const getDependencyMock = (dependency: string, method?: string) => {
// @ts-ignore
const dep = theDependencies[dependency];
let mock = null;
if (dep instanceof Function) {
mock = method ? dep()[method] : dep();
} else {
mock = method ? dep[method] : dep;
}
return mock;
};
const thenDependencyWasCalledTimes = (times: number, dependency: string, method?: string) => {
const mock = getDependencyMock(dependency, method);
expect(mock).toBeCalledTimes(times);
return instance;
};
const thenDependencyWasCalledWith = (args: any[], dependency: string, method?: string) => {
const mock = getDependencyMock(dependency, method);
expect(mock).toBeCalledWith(...args);
return instance;
};
const instance = {
whenActionIsDispatched,
whenQueryReceivesResponse,
whenQueryThrowsError,
whenQueryObserverReceivesEvent,
thenResultingActionsEqual,
thenNoActionsWhereDispatched,
thenDependencyWasCalledTimes,
thenDependencyWasCalledWith,
};
return instance;
};
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment