Commit fb39831d by Hugo Häggmark Committed by GitHub

Explore: Queries the datasource once per run query and uses DataStreamObserver (#17263)

* Refactor: Removes replaceUrl from actions

* Refactor: Moves saveState thunk to epic

* Refactor: Moves thunks to epics

* Wip: removes resulttype and queries once

* Refactor: LiveTailing uses observer in query

* Refactor: Creates epics folder for epics and move back actioncreators

* Tests: Adds tests for epics and reducer

* Fix: Checks for undefined as well

* Refactor: Cleans up previous live tailing implementation

* Chore: merge with master

* Fix: Fixes url issuses and prom graph in Panels

* Refactor: Removes supportsStreaming and adds sockets to DataSourcePluginMeta instead

* Refactor: Changes the way we create TimeSeries

* Refactor: Renames sockets to streaming

* Refactor: Changes the way Explore does incremental updates

* Refactor: Removes unused method

* Refactor: Adds back Loading indication
parent 5761179a
......@@ -38,7 +38,7 @@ export class SetInterval extends PureComponent<Props> {
}
componentDidUpdate(prevProps: Props) {
if (_.isEqual(prevProps, this.props)) {
if ((isLive(prevProps.interval) && isLive(this.props.interval)) || _.isEqual(prevProps, this.props)) {
return;
}
......
......@@ -83,7 +83,7 @@ export interface DataSourcePluginMeta extends PluginMeta {
category?: string;
queryOptions?: PluginMetaQueryOptions;
sort?: number;
supportsStreaming?: boolean;
streaming?: boolean;
/**
* By default, hidden queries are not passed to the datasource
......@@ -164,10 +164,6 @@ export abstract class DataSourceApi<
*/
abstract query(options: DataQueryRequest<TQuery>, observer?: DataStreamObserver): Promise<DataQueryResponse>;
convertToStreamTargets?(options: DataQueryRequest<TQuery>): Array<{ url: string; refId: string }>;
resultToSeriesData?(data: any, refId: string): SeriesData[];
/**
* Test & verify datasource settings & connection details
*/
......
......@@ -160,6 +160,7 @@ export const toLegacyResponseData = (series: SeriesData): TimeSeries | TableData
const type = guessFieldTypeFromSeries(series, 1);
if (type === FieldType.time) {
return {
alias: fields[0].name || series.name,
target: fields[0].name || series.name,
datapoints: rows,
unit: fields[0].unit,
......
......@@ -29,6 +29,7 @@ type DataSourcePlugin struct {
BuiltIn bool `json:"builtIn,omitempty"`
Mixed bool `json:"mixed,omitempty"`
Routes []*AppPluginRoute `json:"routes"`
Streaming bool `json:"streaming"`
Backend bool `json:"backend,omitempty"`
Executable string `json:"executable,omitempty"`
......
......@@ -329,7 +329,7 @@ export default class TimeSeries {
isMsResolutionNeeded() {
for (let i = 0; i < this.datapoints.length; i++) {
if (this.datapoints[i][1] !== null) {
if (this.datapoints[i][1] !== null && this.datapoints[i][1] !== undefined) {
const timestamp = this.datapoints[i][1].toString();
if (timestamp.length === 13 && timestamp % 1000 !== 0) {
return true;
......
// Libraries
import _ from 'lodash';
import { from } from 'rxjs';
import { toUtc } from '@grafana/ui/src/utils/moment_wrapper';
import { isLive } from '@grafana/ui/src/components/RefreshPicker/RefreshPicker';
// Services & Utils
import * as dateMath from '@grafana/ui/src/utils/datemath';
import { renderUrl } from 'app/core/utils/url';
import kbn from 'app/core/utils/kbn';
import store from 'app/core/store';
import TableModel, { mergeTablesIntoModel } from 'app/core/table_model';
import { getNextRefIdChar } from './query';
// Types
import {
colors,
TimeRange,
RawTimeRange,
TimeZone,
IntervalValues,
DataQuery,
DataSourceApi,
toSeriesData,
guessFieldTypes,
TimeFragment,
DataQueryError,
LogRowModel,
LogsModel,
LogsDedupStrategy,
DataSourceJsonData,
DataQueryRequest,
DataStreamObserver,
} from '@grafana/ui';
import TimeSeries from 'app/core/time_series2';
import {
ExploreUrlState,
HistoryItem,
QueryTransaction,
ResultType,
QueryIntervals,
QueryOptions,
ResultGetter,
} from 'app/types/explore';
import { seriesDataToLogsModel } from 'app/core/logs_model';
import { toUtc } from '@grafana/ui/src/utils/moment_wrapper';
import { isLive } from '@grafana/ui/src/components/RefreshPicker/RefreshPicker';
import { ExploreUrlState, HistoryItem, QueryTransaction, QueryIntervals, QueryOptions } from 'app/types/explore';
import { config } from '../config';
export const DEFAULT_RANGE = {
from: 'now-6h',
......@@ -116,7 +107,6 @@ export async function getExploreUrl(
export function buildQueryTransaction(
queries: DataQuery[],
resultType: ResultType,
queryOptions: QueryOptions,
range: TimeRange,
queryIntervals: QueryIntervals,
......@@ -137,7 +127,7 @@ export function buildQueryTransaction(
// Using `format` here because it relates to the view panel that the request is for.
// However, some datasources don't use `panelId + query.refId`, but only `panelId`.
// Therefore panel id has to be unique.
const panelId = `${queryOptions.format}-${key}`;
const panelId = `${key}`;
const options = {
interval,
......@@ -156,7 +146,6 @@ export function buildQueryTransaction(
return {
queries,
options,
resultType,
scanning,
id: generateKey(), // reusing for unique ID
done: false,
......@@ -328,28 +317,6 @@ export function hasNonEmptyQuery<TQuery extends DataQuery = any>(queries: TQuery
);
}
export function calculateResultsFromQueryTransactions(result: any, resultType: ResultType, graphInterval: number) {
const flattenedResult: any[] = _.flatten(result);
const graphResult = resultType === 'Graph' && result ? result : null;
const tableResult =
resultType === 'Table' && result
? mergeTablesIntoModel(
new TableModel(),
...flattenedResult.filter((r: any) => r.columns && r.rows).map((r: any) => r as TableModel)
)
: mergeTablesIntoModel(new TableModel());
const logsResult =
resultType === 'Logs' && result
? seriesDataToLogsModel(flattenedResult.map(r => guessFieldTypes(toSeriesData(r))), graphInterval)
: null;
return {
graphResult,
tableResult,
logsResult,
};
}
export function getIntervals(range: TimeRange, lowLimit: string, resolution: number): IntervalValues {
if (!resolution) {
return { interval: '1s', intervalMs: 1000 };
......@@ -358,37 +325,6 @@ export function getIntervals(range: TimeRange, lowLimit: string, resolution: num
return kbn.calculateInterval(range, resolution, lowLimit);
}
export const makeTimeSeriesList: ResultGetter = (dataList, transaction, allTransactions) => {
// Prevent multiple Graph transactions to have the same colors
let colorIndexOffset = 0;
for (const other of allTransactions) {
// Only need to consider transactions that came before the current one
if (other === transaction) {
break;
}
// Count timeseries of previous query results
if (other.resultType === 'Graph' && other.done) {
colorIndexOffset += other.result.length;
}
}
return dataList.map((seriesData, index: number) => {
const datapoints = seriesData.datapoints || [];
const alias = seriesData.target;
const colorIndex = (colorIndexOffset + index) % colors.length;
const color = colors[colorIndex];
const series = new TimeSeries({
datapoints,
alias,
color,
unit: seriesData.unit,
});
return series;
});
};
/**
* Update the query history. Side-effect: store history in local storage
*/
......@@ -566,3 +502,20 @@ export const sortLogsResult = (logsResult: LogsModel, refreshInterval: string) =
return result;
};
export const convertToWebSocketUrl = (url: string) => {
const protocol = window.location.protocol === 'https:' ? 'wss://' : 'ws://';
let backend = `${protocol}${window.location.host}${config.appSubUrl}`;
if (backend.endsWith('/')) {
backend = backend.slice(0, backend.length - 1);
}
return `${backend}${url}`;
};
export const getQueryResponse = (
datasourceInstance: DataSourceApi<DataQuery, DataSourceJsonData>,
options: DataQueryRequest<DataQuery>,
observer?: DataStreamObserver
) => {
return from(datasourceInstance.query(options, observer));
};
......@@ -51,11 +51,11 @@ import {
} from 'app/core/utils/explore';
import { Emitter } from 'app/core/utils/emitter';
import { ExploreToolbar } from './ExploreToolbar';
import { scanStopAction } from './state/actionTypes';
import { NoDataSourceCallToAction } from './NoDataSourceCallToAction';
import { FadeIn } from 'app/core/components/Animations/FadeIn';
import { getTimeZone } from '../profile/state/selectors';
import { ErrorContainer } from './ErrorContainer';
import { scanStopAction } from './state/actionTypes';
interface ExploreProps {
StartPage?: ComponentClass<ExploreStartPageProps>;
......
......@@ -10,6 +10,7 @@ import {
TimeZone,
TimeRange,
SelectOptionItem,
LoadingState,
} from '@grafana/ui';
import { DataSourcePicker } from 'app/core/components/Select/DataSourcePicker';
import { StoreState } from 'app/types/store';
......@@ -261,9 +262,7 @@ const mapStateToProps = (state: StoreState, { exploreId }: OwnProps): StateProps
exploreDatasources,
range,
refreshInterval,
graphIsLoading,
logIsLoading,
tableIsLoading,
loadingState,
supportedModes,
mode,
isLive,
......@@ -271,8 +270,9 @@ const mapStateToProps = (state: StoreState, { exploreId }: OwnProps): StateProps
const selectedDatasource = datasourceInstance
? exploreDatasources.find(datasource => datasource.name === datasourceInstance.name)
: undefined;
const loading = graphIsLoading || logIsLoading || tableIsLoading;
const hasLiveOption = datasourceInstance && datasourceInstance.convertToStreamTargets ? true : false;
const loading = loadingState === LoadingState.Loading || loadingState === LoadingState.Streaming;
const hasLiveOption =
datasourceInstance && datasourceInstance.meta && datasourceInstance.meta.streaming ? true : false;
const supportedModeOptions: Array<SelectOptionItem<ExploreMode>> = [];
let selectedModeOption = null;
......
import React, { PureComponent } from 'react';
import { hot } from 'react-hot-loader';
import { connect } from 'react-redux';
import { TimeRange, TimeZone, AbsoluteTimeRange } from '@grafana/ui';
import { TimeRange, TimeZone, AbsoluteTimeRange, LoadingState } from '@grafana/ui';
import { ExploreId, ExploreItemState } from 'app/types/explore';
import { StoreState } from 'app/types';
......@@ -69,8 +69,8 @@ function mapStateToProps(state: StoreState, { exploreId }) {
const explore = state.explore;
const { split } = explore;
const item: ExploreItemState = explore[exploreId];
const { graphResult, graphIsLoading, range, showingGraph, showingTable } = item;
const loading = graphIsLoading;
const { graphResult, loadingState, range, showingGraph, showingTable } = item;
const loading = loadingState === LoadingState.Loading || loadingState === LoadingState.Streaming;
return { graphResult, loading, range, showingGraph, showingTable, split, timeZone: getTimeZone(state.user) };
}
......
......@@ -13,6 +13,7 @@ import {
LogsModel,
LogRowModel,
LogsDedupStrategy,
LoadingState,
} from '@grafana/ui';
import { ExploreId, ExploreItemState } from 'app/types/explore';
......@@ -151,14 +152,14 @@ function mapStateToProps(state: StoreState, { exploreId }) {
const {
logsHighlighterExpressions,
logsResult,
logIsLoading,
loadingState,
scanning,
scanRange,
range,
datasourceInstance,
isLive,
} = item;
const loading = logIsLoading;
const loading = loadingState === LoadingState.Loading || loadingState === LoadingState.Streaming;
const { dedupStrategy } = exploreItemUIStateSelector(item);
const hiddenLogLevels = new Set(item.hiddenLogLevels);
const dedupedResult = deduplicatedLogsSelector(item);
......
......@@ -20,7 +20,6 @@ import {
QueryFixAction,
DataSourceStatus,
PanelData,
LoadingState,
DataQueryError,
} from '@grafana/ui';
import { HistoryItem, ExploreItemState, ExploreId } from 'app/types/explore';
......@@ -180,9 +179,7 @@ function mapStateToProps(state: StoreState, { exploreId, index }: QueryRowProps)
range,
datasourceError,
graphResult,
graphIsLoading,
tableIsLoading,
logIsLoading,
loadingState,
latency,
queryErrors,
} = item;
......@@ -190,15 +187,9 @@ function mapStateToProps(state: StoreState, { exploreId, index }: QueryRowProps)
const datasourceStatus = datasourceError ? DataSourceStatus.Disconnected : DataSourceStatus.Connected;
const error = queryErrors.filter(queryError => queryError.refId === query.refId)[0];
const series = graphResult ? graphResult : []; // TODO: use SeriesData
const queryResponseState =
graphIsLoading || tableIsLoading || logIsLoading
? LoadingState.Loading
: error
? LoadingState.Error
: LoadingState.Done;
const queryResponse: PanelData = {
series,
state: queryResponseState,
state: loadingState,
error,
};
......
......@@ -9,6 +9,7 @@ import { toggleTable } from './state/actions';
import Table from './Table';
import Panel from './Panel';
import TableModel from 'app/core/table_model';
import { LoadingState } from '@grafana/ui';
interface TableContainerProps {
exploreId: ExploreId;
......@@ -38,8 +39,11 @@ export class TableContainer extends PureComponent<TableContainerProps> {
function mapStateToProps(state: StoreState, { exploreId }) {
const explore = state.explore;
const item: ExploreItemState = explore[exploreId];
const { tableIsLoading, showingTable, tableResult } = item;
const loading = tableIsLoading;
const { loadingState, showingTable, tableResult } = item;
const loading =
tableResult && tableResult.rows.length > 0
? false
: loadingState === LoadingState.Loading || loadingState === LoadingState.Streaming;
return { loading, showingTable, tableResult };
}
......
......@@ -9,18 +9,23 @@ import {
LogLevel,
TimeRange,
DataQueryError,
SeriesData,
LogsModel,
TimeSeries,
DataQueryResponseData,
LoadingState,
} from '@grafana/ui/src/types';
import {
ExploreId,
ExploreItemState,
HistoryItem,
RangeScanner,
ResultType,
QueryTransaction,
ExploreUIState,
ExploreMode,
QueryOptions,
} from 'app/types/explore';
import { actionCreatorFactory, noPayloadActionCreatorFactory, ActionOf } from 'app/core/redux/actionCreatorFactory';
import TableModel from 'app/core/table_model';
/** Higher order actions
*
......@@ -142,21 +147,19 @@ export interface ModifyQueriesPayload {
export interface QueryFailurePayload {
exploreId: ExploreId;
response: DataQueryError;
resultType: ResultType;
}
export interface QueryStartPayload {
exploreId: ExploreId;
resultType: ResultType;
rowIndex: number;
transaction: QueryTransaction;
}
export interface QuerySuccessPayload {
exploreId: ExploreId;
result: any;
resultType: ResultType;
latency: number;
loadingState: LoadingState;
graphResult: TimeSeries[];
tableResult: TableModel;
logsResult: LogsModel;
}
export interface HistoryUpdatedPayload {
......@@ -238,6 +241,41 @@ export interface ResetQueryErrorPayload {
refIds: string[];
}
export interface SetUrlReplacedPayload {
exploreId: ExploreId;
}
export interface ProcessQueryErrorsPayload {
exploreId: ExploreId;
response: any;
datasourceId: string;
}
export interface ProcessQueryResultsPayload {
exploreId: ExploreId;
latency: number;
datasourceId: string;
loadingState: LoadingState;
series?: DataQueryResponseData[];
delta?: SeriesData[];
}
export interface RunQueriesBatchPayload {
exploreId: ExploreId;
queryOptions: QueryOptions;
}
export interface LimitMessageRatePayload {
series: SeriesData[];
exploreId: ExploreId;
datasourceId: string;
}
export interface ChangeRangePayload {
exploreId: ExploreId;
range: TimeRange;
}
/**
* Adds a query row after the row with the given index.
*/
......@@ -333,13 +371,6 @@ export const modifyQueriesAction = actionCreatorFactory<ModifyQueriesPayload>('e
*/
export const queryFailureAction = actionCreatorFactory<QueryFailurePayload>('explore/QUERY_FAILURE').create();
/**
* Start a query transaction for the given result type.
* @param exploreId Explore area
* @param transaction Query options and `done` status.
* @param resultType Associate the transaction with a result viewer, e.g., Graph
* @param rowIndex Index is used to associate latency for this transaction with a query row
*/
export const queryStartAction = actionCreatorFactory<QueryStartPayload>('explore/QUERY_START').create();
/**
......@@ -392,6 +423,7 @@ export const splitCloseAction = actionCreatorFactory<SplitCloseActionPayload>('e
* The copy keeps all query modifications but wipes the query results.
*/
export const splitOpenAction = actionCreatorFactory<SplitOpenPayload>('explore/SPLIT_OPEN').create();
export const stateSaveAction = noPayloadActionCreatorFactory('explore/STATE_SAVE').create();
/**
......@@ -440,6 +472,24 @@ export const historyUpdatedAction = actionCreatorFactory<HistoryUpdatedPayload>(
export const resetQueryErrorAction = actionCreatorFactory<ResetQueryErrorPayload>('explore/RESET_QUERY_ERROR').create();
export const setUrlReplacedAction = actionCreatorFactory<SetUrlReplacedPayload>('explore/SET_URL_REPLACED').create();
export const processQueryErrorsAction = actionCreatorFactory<ProcessQueryErrorsPayload>(
'explore/PROCESS_QUERY_ERRORS'
).create();
export const processQueryResultsAction = actionCreatorFactory<ProcessQueryResultsPayload>(
'explore/PROCESS_QUERY_RESULTS'
).create();
export const runQueriesBatchAction = actionCreatorFactory<RunQueriesBatchPayload>('explore/RUN_QUERIES_BATCH').create();
export const limitMessageRatePayloadAction = actionCreatorFactory<LimitMessageRatePayload>(
'explore/LIMIT_MESSAGE_RATE_PAYLOAD'
).create();
export const changeRangeAction = actionCreatorFactory<ChangeRangePayload>('explore/CHANGE_RANGE').create();
export type HigherOrderAction =
| ActionOf<SplitCloseActionPayload>
| SplitOpenAction
......
import { Epic } from 'redux-observable';
import { NEVER } from 'rxjs';
import { takeUntil, mergeMap, tap, filter, map, throttleTime } from 'rxjs/operators';
import { StoreState, ExploreId } from 'app/types';
import { ActionOf, ActionCreator, actionCreatorFactory } from '../../../core/redux/actionCreatorFactory';
import { config } from '../../../core/config';
import {
updateDatasourceInstanceAction,
resetExploreAction,
changeRefreshIntervalAction,
clearQueriesAction,
} from './actionTypes';
import { isLive } from '@grafana/ui/src/components/RefreshPicker/RefreshPicker';
import { SeriesData } from '@grafana/ui/src/types/data';
import { EpicDependencies } from 'app/store/configureStore';
const convertToWebSocketUrl = (url: string) => {
const protocol = window.location.protocol === 'https:' ? 'wss://' : 'ws://';
let backend = `${protocol}${window.location.host}${config.appSubUrl}`;
if (backend.endsWith('/')) {
backend = backend.slice(0, backend.length - 1);
}
return `${backend}${url}`;
};
export interface StartSubscriptionsPayload {
exploreId: ExploreId;
dataReceivedActionCreator: ActionCreator<SubscriptionDataReceivedPayload>;
}
export const startSubscriptionsAction = actionCreatorFactory<StartSubscriptionsPayload>(
'explore/START_SUBSCRIPTIONS'
).create();
export interface StartSubscriptionPayload {
url: string;
refId: string;
exploreId: ExploreId;
dataReceivedActionCreator: ActionCreator<SubscriptionDataReceivedPayload>;
}
export const startSubscriptionAction = actionCreatorFactory<StartSubscriptionPayload>(
'explore/START_SUBSCRIPTION'
).create();
export interface SubscriptionDataReceivedPayload {
data: SeriesData;
exploreId: ExploreId;
}
export const subscriptionDataReceivedAction = actionCreatorFactory<SubscriptionDataReceivedPayload>(
'explore/SUBSCRIPTION_DATA_RECEIVED'
).create();
export interface LimitMessageRatePayload {
data: SeriesData;
exploreId: ExploreId;
dataReceivedActionCreator: ActionCreator<SubscriptionDataReceivedPayload>;
}
export const limitMessageRatePayloadAction = actionCreatorFactory<LimitMessageRatePayload>(
'explore/LIMIT_MESSAGE_RATE_PAYLOAD'
).create();
export const startSubscriptionsEpic: Epic<ActionOf<any>, ActionOf<any>, StoreState> = (action$, state$) => {
return action$.ofType(startSubscriptionsAction.type).pipe(
mergeMap((action: ActionOf<StartSubscriptionsPayload>) => {
const { exploreId, dataReceivedActionCreator } = action.payload;
const { datasourceInstance, queries, refreshInterval } = state$.value.explore[exploreId];
if (!datasourceInstance || !datasourceInstance.convertToStreamTargets) {
return NEVER; //do nothing if datasource does not support streaming
}
if (!refreshInterval || !isLive(refreshInterval)) {
return NEVER; //do nothing if refresh interval is not 'LIVE'
}
const request: any = { targets: queries };
return datasourceInstance.convertToStreamTargets(request).map(target =>
startSubscriptionAction({
url: convertToWebSocketUrl(target.url),
refId: target.refId,
exploreId,
dataReceivedActionCreator,
})
);
})
);
};
export const startSubscriptionEpic: Epic<ActionOf<any>, ActionOf<any>, StoreState, EpicDependencies> = (
action$,
state$,
{ getWebSocket }
) => {
return action$.ofType(startSubscriptionAction.type).pipe(
mergeMap((action: ActionOf<StartSubscriptionPayload>) => {
const { url, exploreId, refId, dataReceivedActionCreator } = action.payload;
return getWebSocket(url).pipe(
takeUntil(
action$
.ofType(
startSubscriptionAction.type,
resetExploreAction.type,
updateDatasourceInstanceAction.type,
changeRefreshIntervalAction.type,
clearQueriesAction.type
)
.pipe(
filter(action => {
if (action.type === resetExploreAction.type) {
return true; // stops all subscriptions if user navigates away
}
if (action.type === updateDatasourceInstanceAction.type && action.payload.exploreId === exploreId) {
return true; // stops subscriptions if user changes data source
}
if (action.type === changeRefreshIntervalAction.type && action.payload.exploreId === exploreId) {
return !isLive(action.payload.refreshInterval); // stops subscriptions if user changes refresh interval away from 'Live'
}
if (action.type === clearQueriesAction.type && action.payload.exploreId === exploreId) {
return true; // stops subscriptions if user clears all queries
}
return action.payload.exploreId === exploreId && action.payload.refId === refId;
}),
tap(value => console.log('Stopping subscription', value))
)
),
mergeMap((result: any) => {
const { datasourceInstance } = state$.value.explore[exploreId];
if (!datasourceInstance || !datasourceInstance.resultToSeriesData) {
return [null]; //do nothing if datasource does not support streaming
}
return datasourceInstance
.resultToSeriesData(result, refId)
.map(data => limitMessageRatePayloadAction({ exploreId, data, dataReceivedActionCreator }));
}),
filter(action => action !== null)
);
})
);
};
export const limitMessageRateEpic: Epic<ActionOf<any>, ActionOf<any>, StoreState, EpicDependencies> = action$ => {
return action$.ofType(limitMessageRatePayloadAction.type).pipe(
throttleTime(1),
map((action: ActionOf<LimitMessageRatePayload>) => {
const { exploreId, data, dataReceivedActionCreator } = action.payload;
return dataReceivedActionCreator({ exploreId, data });
})
);
};
import { Epic } from 'redux-observable';
import { map, throttleTime } from 'rxjs/operators';
import { LoadingState } from '@grafana/ui';
import { StoreState } from 'app/types';
import { ActionOf } from '../../../../core/redux/actionCreatorFactory';
import { limitMessageRatePayloadAction, LimitMessageRatePayload, processQueryResultsAction } from '../actionTypes';
import { EpicDependencies } from 'app/store/configureStore';
export const limitMessageRateEpic: Epic<ActionOf<any>, ActionOf<any>, StoreState, EpicDependencies> = action$ => {
return action$.ofType(limitMessageRatePayloadAction.type).pipe(
throttleTime(1),
map((action: ActionOf<LimitMessageRatePayload>) => {
const { exploreId, series, datasourceId } = action.payload;
return processQueryResultsAction({
exploreId,
latency: 0,
datasourceId,
loadingState: LoadingState.Streaming,
series: null,
delta: series,
});
})
);
};
import { mockExploreState } from 'test/mocks/mockExploreState';
import { epicTester } from 'test/core/redux/epicTester';
import { processQueryErrorsAction, queryFailureAction } from '../actionTypes';
import { processQueryErrorsEpic } from './processQueryErrorsEpic';
describe('processQueryErrorsEpic', () => {
let originalConsoleError = console.error;
beforeEach(() => {
originalConsoleError = console.error;
console.error = jest.fn();
});
afterEach(() => {
console.error = originalConsoleError;
});
describe('when processQueryErrorsAction is dispatched', () => {
describe('and datasourceInstance is the same', () => {
describe('and the response is not cancelled', () => {
it('then queryFailureAction is dispatched', () => {
const { datasourceId, exploreId, state, eventBridge } = mockExploreState();
const response = { message: 'Something went terribly wrong!' };
epicTester(processQueryErrorsEpic, state)
.whenActionIsDispatched(processQueryErrorsAction({ exploreId, datasourceId, response }))
.thenResultingActionsEqual(queryFailureAction({ exploreId, response }));
expect(console.error).toBeCalledTimes(1);
expect(console.error).toBeCalledWith(response);
expect(eventBridge.emit).toBeCalledTimes(1);
expect(eventBridge.emit).toBeCalledWith('data-error', response);
});
});
describe('and the response is cancelled', () => {
it('then no actions are dispatched', () => {
const { datasourceId, exploreId, state, eventBridge } = mockExploreState();
const response = { cancelled: true, message: 'Something went terribly wrong!' };
epicTester(processQueryErrorsEpic, state)
.whenActionIsDispatched(processQueryErrorsAction({ exploreId, datasourceId, response }))
.thenNoActionsWhereDispatched();
expect(console.error).not.toBeCalled();
expect(eventBridge.emit).not.toBeCalled();
});
});
});
describe('and datasourceInstance is not the same', () => {
describe('and the response is not cancelled', () => {
it('then no actions are dispatched', () => {
const { exploreId, state, eventBridge } = mockExploreState();
const response = { message: 'Something went terribly wrong!' };
epicTester(processQueryErrorsEpic, state)
.whenActionIsDispatched(processQueryErrorsAction({ exploreId, datasourceId: 'other id', response }))
.thenNoActionsWhereDispatched();
expect(console.error).not.toBeCalled();
expect(eventBridge.emit).not.toBeCalled();
});
});
});
});
});
import { Epic } from 'redux-observable';
import { mergeMap } from 'rxjs/operators';
import { NEVER, of } from 'rxjs';
import { ActionOf } from 'app/core/redux/actionCreatorFactory';
import { StoreState } from 'app/types/store';
import { instanceOfDataQueryError } from 'app/core/utils/explore';
import { toDataQueryError } from 'app/features/dashboard/state/PanelQueryState';
import { processQueryErrorsAction, ProcessQueryErrorsPayload, queryFailureAction } from '../actionTypes';
export const processQueryErrorsEpic: Epic<ActionOf<any>, ActionOf<any>, StoreState> = (action$, state$) => {
return action$.ofType(processQueryErrorsAction.type).pipe(
mergeMap((action: ActionOf<ProcessQueryErrorsPayload>) => {
const { exploreId, datasourceId } = action.payload;
let { response } = action.payload;
const { datasourceInstance, eventBridge } = state$.value.explore[exploreId];
if (datasourceInstance.meta.id !== datasourceId || response.cancelled) {
// Navigated away, queries did not matter
return NEVER;
}
// For Angular editors
eventBridge.emit('data-error', response);
console.error(response); // To help finding problems with query syntax
if (!instanceOfDataQueryError(response)) {
response = toDataQueryError(response);
}
return of(
queryFailureAction({
exploreId,
response,
})
);
})
);
};
import { mockExploreState } from 'test/mocks/mockExploreState';
import { epicTester } from 'test/core/redux/epicTester';
import {
processQueryResultsAction,
resetQueryErrorAction,
querySuccessAction,
scanStopAction,
scanRangeAction,
} from '../actionTypes';
import { SeriesData, LoadingState } from '@grafana/ui';
import { processQueryResultsEpic } from './processQueryResultsEpic';
import TableModel from 'app/core/table_model';
const testContext = () => {
const serieA: SeriesData = {
fields: [],
refId: 'A',
rows: [],
};
const serieB: SeriesData = {
fields: [],
refId: 'B',
rows: [],
};
const series = [serieA, serieB];
const latency = 0;
const loadingState = LoadingState.Done;
return {
latency,
series,
loadingState,
};
};
describe('processQueryResultsEpic', () => {
describe('when processQueryResultsAction is dispatched', () => {
describe('and datasourceInstance is the same', () => {
describe('and explore is not scanning', () => {
it('then resetQueryErrorAction and querySuccessAction are dispatched and eventBridge emits correct message', () => {
const { datasourceId, exploreId, state, eventBridge } = mockExploreState();
const { latency, series, loadingState } = testContext();
const graphResult = [];
const tableResult = new TableModel();
const logsResult = null;
epicTester(processQueryResultsEpic, state)
.whenActionIsDispatched(
processQueryResultsAction({ exploreId, datasourceId, loadingState, series, latency })
)
.thenResultingActionsEqual(
resetQueryErrorAction({ exploreId, refIds: ['A', 'B'] }),
querySuccessAction({ exploreId, loadingState, graphResult, tableResult, logsResult, latency })
);
expect(eventBridge.emit).toBeCalledTimes(1);
expect(eventBridge.emit).toBeCalledWith('data-received', series);
});
});
describe('and explore is scanning', () => {
describe('and we have a result', () => {
it('then correct actions are dispatched', () => {
const { datasourceId, exploreId, state } = mockExploreState({ scanning: true });
const { latency, series, loadingState } = testContext();
const graphResult = [];
const tableResult = new TableModel();
const logsResult = null;
epicTester(processQueryResultsEpic, state)
.whenActionIsDispatched(
processQueryResultsAction({ exploreId, datasourceId, loadingState, series, latency })
)
.thenResultingActionsEqual(
resetQueryErrorAction({ exploreId, refIds: ['A', 'B'] }),
querySuccessAction({ exploreId, loadingState, graphResult, tableResult, logsResult, latency }),
scanStopAction({ exploreId })
);
});
});
describe('and we do not have a result', () => {
it('then correct actions are dispatched', () => {
const { datasourceId, exploreId, state, scanner } = mockExploreState({ scanning: true });
const { latency, loadingState } = testContext();
const graphResult = [];
const tableResult = new TableModel();
const logsResult = null;
epicTester(processQueryResultsEpic, state)
.whenActionIsDispatched(
processQueryResultsAction({ exploreId, datasourceId, loadingState, series: [], latency })
)
.thenResultingActionsEqual(
resetQueryErrorAction({ exploreId, refIds: [] }),
querySuccessAction({ exploreId, loadingState, graphResult, tableResult, logsResult, latency }),
scanRangeAction({ exploreId, range: scanner() })
);
});
});
});
});
describe('and datasourceInstance is not the same', () => {
it('then no actions are dispatched and eventBridge does not emit message', () => {
const { exploreId, state, eventBridge } = mockExploreState();
const { series, loadingState } = testContext();
epicTester(processQueryResultsEpic, state)
.whenActionIsDispatched(
processQueryResultsAction({ exploreId, datasourceId: 'other id', loadingState, series, latency: 0 })
)
.thenNoActionsWhereDispatched();
expect(eventBridge.emit).not.toBeCalled();
});
});
});
});
import _ from 'lodash';
import { Epic } from 'redux-observable';
import { mergeMap } from 'rxjs/operators';
import { NEVER } from 'rxjs';
import { LoadingState } from '@grafana/ui';
import { ActionOf } from 'app/core/redux/actionCreatorFactory';
import { StoreState } from 'app/types/store';
import { getRefIds } from 'app/core/utils/explore';
import {
processQueryResultsAction,
ProcessQueryResultsPayload,
querySuccessAction,
scanRangeAction,
resetQueryErrorAction,
scanStopAction,
} from '../actionTypes';
import { ResultProcessor } from '../../utils/ResultProcessor';
export const processQueryResultsEpic: Epic<ActionOf<any>, ActionOf<any>, StoreState> = (action$, state$) => {
return action$.ofType(processQueryResultsAction.type).pipe(
mergeMap((action: ActionOf<ProcessQueryResultsPayload>) => {
const { exploreId, datasourceId, latency, loadingState, series, delta } = action.payload;
const { datasourceInstance, scanning, scanner, eventBridge } = state$.value.explore[exploreId];
// If datasource already changed, results do not matter
if (datasourceInstance.meta.id !== datasourceId) {
return NEVER;
}
const result = series || delta || [];
const replacePreviousResults = loadingState === LoadingState.Done && series && !delta ? true : false;
const resultProcessor = new ResultProcessor(state$.value.explore[exploreId], replacePreviousResults, result);
const graphResult = resultProcessor.getGraphResult();
const tableResult = resultProcessor.getTableResult();
const logsResult = resultProcessor.getLogsResult();
const refIds = getRefIds(result);
const actions: Array<ActionOf<any>> = [];
// For Angular editors
eventBridge.emit('data-received', resultProcessor.getRawData());
// Clears any previous errors that now have a successful query, important so Angular editors are updated correctly
actions.push(
resetQueryErrorAction({
exploreId,
refIds,
})
);
actions.push(
querySuccessAction({
exploreId,
latency,
loadingState,
graphResult,
tableResult,
logsResult,
})
);
// Keep scanning for results if this was the last scanning transaction
if (scanning) {
if (_.size(result) === 0) {
const range = scanner();
actions.push(scanRangeAction({ exploreId, range }));
} else {
// We can stop scanning if we have a result
actions.push(scanStopAction({ exploreId }));
}
}
return actions;
})
);
};
import { Epic } from 'redux-observable';
import { Observable, Subject } from 'rxjs';
import { mergeMap, catchError, takeUntil, filter } from 'rxjs/operators';
import _, { isString } from 'lodash';
import { isLive } from '@grafana/ui/src/components/RefreshPicker/RefreshPicker';
import { DataStreamState, LoadingState, DataQueryResponse, SeriesData, DataQueryResponseData } from '@grafana/ui';
import * as dateMath from '@grafana/ui/src/utils/datemath';
import { ActionOf } from 'app/core/redux/actionCreatorFactory';
import { StoreState } from 'app/types/store';
import { buildQueryTransaction, updateHistory } from 'app/core/utils/explore';
import {
clearQueriesAction,
historyUpdatedAction,
resetExploreAction,
updateDatasourceInstanceAction,
changeRefreshIntervalAction,
processQueryErrorsAction,
processQueryResultsAction,
runQueriesBatchAction,
RunQueriesBatchPayload,
queryStartAction,
limitMessageRatePayloadAction,
stateSaveAction,
changeRangeAction,
} from '../actionTypes';
import { ExploreId, ExploreItemState } from 'app/types';
const publishActions = (outerObservable: Subject<any>, actions: Array<ActionOf<any>>) => {
for (const action of actions) {
outerObservable.next(action);
}
};
interface ProcessResponseConfig {
exploreId: ExploreId;
exploreItemState: ExploreItemState;
datasourceId: string;
now: number;
loadingState: LoadingState;
series?: DataQueryResponseData[];
delta?: SeriesData[];
}
const processResponse = (config: ProcessResponseConfig) => {
const { exploreId, exploreItemState, datasourceId, now, loadingState, series, delta } = config;
const { queries, history } = exploreItemState;
const latency = Date.now() - now;
// Side-effect: Saving history in localstorage
const nextHistory = updateHistory(history, datasourceId, queries);
return [
historyUpdatedAction({ exploreId, history: nextHistory }),
processQueryResultsAction({ exploreId, latency, datasourceId, loadingState, series, delta }),
stateSaveAction(),
];
};
interface ProcessErrorConfig {
exploreId: ExploreId;
datasourceId: string;
error: any;
}
const processError = (config: ProcessErrorConfig) => {
const { exploreId, datasourceId, error } = config;
return [processQueryErrorsAction({ exploreId, response: error, datasourceId })];
};
export const runQueriesBatchEpic: Epic<ActionOf<any>, ActionOf<any>, StoreState> = (
action$,
state$,
{ getQueryResponse }
) => {
return action$.ofType(runQueriesBatchAction.type).pipe(
mergeMap((action: ActionOf<RunQueriesBatchPayload>) => {
const { exploreId, queryOptions } = action.payload;
const exploreItemState = state$.value.explore[exploreId];
const { datasourceInstance, queries, queryIntervals, range, scanning } = exploreItemState;
// Create an observable per run queries action
// Within the observable create two subscriptions
// First subscription: 'querySubscription' subscribes to the call to query method on datasourceinstance
// Second subscription: 'streamSubscription' subscribes to events from the query methods observer callback
const observable: Observable<ActionOf<any>> = Observable.create((outerObservable: Subject<any>) => {
const datasourceId = datasourceInstance.meta.id;
const transaction = buildQueryTransaction(queries, queryOptions, range, queryIntervals, scanning);
outerObservable.next(queryStartAction({ exploreId }));
const now = Date.now();
let datasourceUnsubscribe: Function = null;
const streamHandler = new Subject<DataStreamState>();
const observer = (event: DataStreamState) => {
datasourceUnsubscribe = event.unsubscribe;
if (!streamHandler.closed) {
// their might be a race condition when unsubscribing
streamHandler.next(event);
}
};
// observer subscription, handles datasourceInstance.query observer events and pushes that forward
const streamSubscription = streamHandler.subscribe({
next: event => {
const { state, error, series, delta } = event;
if (!series && !delta && !error) {
return;
}
if (state === LoadingState.Error) {
const actions = processError({ exploreId, datasourceId, error });
publishActions(outerObservable, actions);
}
if (state === LoadingState.Streaming) {
if (event.request && event.request.range) {
let newRange = event.request.range;
if (isString(newRange.raw.from)) {
newRange = {
from: dateMath.parse(newRange.raw.from, false),
to: dateMath.parse(newRange.raw.to, true),
raw: newRange.raw,
};
}
outerObservable.next(changeRangeAction({ exploreId, range: newRange }));
}
outerObservable.next(
limitMessageRatePayloadAction({
exploreId,
series: delta,
datasourceId,
})
);
}
if (state === LoadingState.Done || state === LoadingState.Loading) {
const actions = processResponse({
exploreId,
exploreItemState,
datasourceId,
now,
loadingState: state,
series: null,
delta,
});
publishActions(outerObservable, actions);
}
},
});
// query subscription, handles datasourceInstance.query response and pushes that forward
const querySubscription = getQueryResponse(datasourceInstance, transaction.options, observer)
.pipe(
mergeMap((response: DataQueryResponse) => {
return processResponse({
exploreId,
exploreItemState,
datasourceId,
now,
loadingState: LoadingState.Done,
series: response && response.data ? response.data : [],
delta: null,
});
}),
catchError(error => {
return processError({ exploreId, datasourceId, error });
})
)
.subscribe({ next: (action: ActionOf<any>) => outerObservable.next(action) });
// this unsubscribe method will be called when any of the takeUntil actions below happen
const unsubscribe = () => {
if (datasourceUnsubscribe) {
datasourceUnsubscribe();
}
querySubscription.unsubscribe();
streamSubscription.unsubscribe();
streamHandler.unsubscribe();
outerObservable.unsubscribe();
};
return unsubscribe;
});
return observable.pipe(
takeUntil(
action$
.ofType(
runQueriesBatchAction.type,
resetExploreAction.type,
updateDatasourceInstanceAction.type,
changeRefreshIntervalAction.type,
clearQueriesAction.type
)
.pipe(
filter(action => {
if (action.type === resetExploreAction.type) {
return true; // stops all subscriptions if user navigates away
}
if (action.type === updateDatasourceInstanceAction.type && action.payload.exploreId === exploreId) {
return true; // stops subscriptions if user changes data source
}
if (action.type === changeRefreshIntervalAction.type && action.payload.exploreId === exploreId) {
return !isLive(action.payload.refreshInterval); // stops subscriptions if user changes refresh interval away from 'Live'
}
if (action.type === clearQueriesAction.type && action.payload.exploreId === exploreId) {
return true; // stops subscriptions if user clears all queries
}
return action.payload.exploreId === exploreId;
})
)
)
);
})
);
};
import { mockExploreState } from 'test/mocks/mockExploreState';
import { epicTester } from 'test/core/redux/epicTester';
import { runQueriesAction, stateSaveAction, runQueriesBatchAction, clearQueriesAction } from '../actionTypes';
import { runQueriesEpic } from './runQueriesEpic';
describe('runQueriesEpic', () => {
describe('when runQueriesAction is dispatched', () => {
describe('and there is no datasourceError', () => {
describe('and we have non empty queries', () => {
describe('and explore is not live', () => {
it('then runQueriesBatchAction and stateSaveAction are dispatched', () => {
const queries = [{ refId: 'A', key: '123456', expr: '{__filename__="some.log"}' }];
const { exploreId, state, datasourceInterval, containerWidth } = mockExploreState({ queries });
epicTester(runQueriesEpic, state)
.whenActionIsDispatched(runQueriesAction({ exploreId, range: null }))
.thenResultingActionsEqual(
runQueriesBatchAction({
exploreId,
queryOptions: { interval: datasourceInterval, maxDataPoints: containerWidth, live: false },
})
);
});
});
describe('and explore is live', () => {
it('then runQueriesBatchAction and stateSaveAction are dispatched', () => {
const queries = [{ refId: 'A', key: '123456', expr: '{__filename__="some.log"}' }];
const { exploreId, state, datasourceInterval, containerWidth } = mockExploreState({
queries,
isLive: true,
streaming: true,
});
epicTester(runQueriesEpic, state)
.whenActionIsDispatched(runQueriesAction({ exploreId, range: null }))
.thenResultingActionsEqual(
runQueriesBatchAction({
exploreId,
queryOptions: { interval: datasourceInterval, maxDataPoints: containerWidth, live: true },
})
);
});
});
});
describe('and we have no queries', () => {
it('then clearQueriesAction and stateSaveAction are dispatched', () => {
const queries = [];
const { exploreId, state } = mockExploreState({ queries });
epicTester(runQueriesEpic, state)
.whenActionIsDispatched(runQueriesAction({ exploreId, range: null }))
.thenResultingActionsEqual(clearQueriesAction({ exploreId }), stateSaveAction());
});
});
});
describe('and there is a datasourceError', () => {
it('then no actions are dispatched', () => {
const { exploreId, state } = mockExploreState({
datasourceError: { message: 'Some error' },
});
epicTester(runQueriesEpic, state)
.whenActionIsDispatched(runQueriesAction({ exploreId, range: null }))
.thenNoActionsWhereDispatched();
});
});
});
});
import { Epic } from 'redux-observable';
import { NEVER } from 'rxjs';
import { mergeMap } from 'rxjs/operators';
import { ActionOf } from 'app/core/redux/actionCreatorFactory';
import { StoreState } from 'app/types/store';
import { hasNonEmptyQuery } from 'app/core/utils/explore';
import {
clearQueriesAction,
runQueriesAction,
RunQueriesPayload,
runQueriesBatchAction,
stateSaveAction,
} from '../actionTypes';
export const runQueriesEpic: Epic<ActionOf<any>, ActionOf<any>, StoreState> = (action$, state$) => {
return action$.ofType(runQueriesAction.type).pipe(
mergeMap((action: ActionOf<RunQueriesPayload>) => {
const { exploreId } = action.payload;
const { datasourceInstance, queries, datasourceError, containerWidth, isLive } = state$.value.explore[exploreId];
if (datasourceError) {
// let's not run any queries if data source is in a faulty state
return NEVER;
}
if (!hasNonEmptyQuery(queries)) {
return [clearQueriesAction({ exploreId }), stateSaveAction()]; // Remember to save to state and update location
}
// Some datasource's query builders allow per-query interval limits,
// but we're using the datasource interval limit for now
const interval = datasourceInstance.interval;
const live = isLive;
return [runQueriesBatchAction({ exploreId, queryOptions: { interval, maxDataPoints: containerWidth, live } })];
})
);
};
import { epicTester } from 'test/core/redux/epicTester';
import { stateSaveEpic } from './stateSaveEpic';
import { stateSaveAction, setUrlReplacedAction } from '../actionTypes';
import { updateLocation } from 'app/core/actions/location';
import { mockExploreState } from 'test/mocks/mockExploreState';
describe('stateSaveEpic', () => {
describe('when stateSaveAction is dispatched', () => {
describe('and there is a left state', () => {
describe('and no split', () => {
it('then the correct actions are dispatched', () => {
const { exploreId, state } = mockExploreState();
epicTester(stateSaveEpic, state)
.whenActionIsDispatched(stateSaveAction())
.thenResultingActionsEqual(
updateLocation({
query: { left: '["now-6h","now","test",{"ui":[true,true,true,null]}]' },
replace: true,
}),
setUrlReplacedAction({ exploreId })
);
});
});
describe('and explore is splitted', () => {
it('then the correct actions are dispatched', () => {
const { exploreId, state } = mockExploreState({ split: true });
epicTester(stateSaveEpic, state)
.whenActionIsDispatched(stateSaveAction())
.thenResultingActionsEqual(
updateLocation({
query: {
left: '["now-6h","now","test",{"ui":[true,true,true,null]}]',
right: '["now-6h","now","test",{"ui":[true,true,true,null]}]',
},
replace: true,
}),
setUrlReplacedAction({ exploreId })
);
});
});
});
describe('and urlReplaced is true', () => {
it('then setUrlReplacedAction should not be dispatched', () => {
const { state } = mockExploreState({ urlReplaced: true });
epicTester(stateSaveEpic, state)
.whenActionIsDispatched(stateSaveAction())
.thenResultingActionsEqual(
updateLocation({
query: { left: '["now-6h","now","test",{"ui":[true,true,true,null]}]' },
replace: false,
})
);
});
});
});
});
import { Epic } from 'redux-observable';
import { mergeMap } from 'rxjs/operators';
import { RawTimeRange, TimeRange } from '@grafana/ui/src/types/time';
import { isDateTime } from '@grafana/ui/src/utils/moment_wrapper';
import { ActionOf } from 'app/core/redux/actionCreatorFactory';
import { StoreState } from 'app/types/store';
import { ExploreUrlState, ExploreId } from 'app/types/explore';
import { clearQueryKeys, serializeStateToUrlParam } from 'app/core/utils/explore';
import { updateLocation } from 'app/core/actions/location';
import { setUrlReplacedAction, stateSaveAction } from '../actionTypes';
const toRawTimeRange = (range: TimeRange): RawTimeRange => {
let from = range.raw.from;
if (isDateTime(from)) {
from = from.valueOf().toString(10);
}
let to = range.raw.to;
if (isDateTime(to)) {
to = to.valueOf().toString(10);
}
return {
from,
to,
};
};
export const stateSaveEpic: Epic<ActionOf<any>, ActionOf<any>, StoreState> = (action$, state$) => {
return action$.ofType(stateSaveAction.type).pipe(
mergeMap(() => {
const { left, right, split } = state$.value.explore;
const replace = left && left.urlReplaced === false;
const urlStates: { [index: string]: string } = {};
const leftUrlState: ExploreUrlState = {
datasource: left.datasourceInstance.name,
queries: left.queries.map(clearQueryKeys),
range: toRawTimeRange(left.range),
ui: {
showingGraph: left.showingGraph,
showingLogs: true,
showingTable: left.showingTable,
dedupStrategy: left.dedupStrategy,
},
};
urlStates.left = serializeStateToUrlParam(leftUrlState, true);
if (split) {
const rightUrlState: ExploreUrlState = {
datasource: right.datasourceInstance.name,
queries: right.queries.map(clearQueryKeys),
range: toRawTimeRange(right.range),
ui: {
showingGraph: right.showingGraph,
showingLogs: true,
showingTable: right.showingTable,
dedupStrategy: right.dedupStrategy,
},
};
urlStates.right = serializeStateToUrlParam(rightUrlState, true);
}
const actions: Array<ActionOf<any>> = [updateLocation({ query: urlStates, replace })];
if (replace) {
actions.push(setUrlReplacedAction({ exploreId: ExploreId.left }));
}
return actions;
})
);
};
......@@ -17,7 +17,6 @@ import {
import { reducerTester } from 'test/core/redux/reducerTester';
import {
scanStartAction,
scanStopAction,
testDataSourcePendingAction,
testDataSourceSuccessAction,
testDataSourceFailureAction,
......@@ -25,6 +24,7 @@ import {
splitOpenAction,
splitCloseAction,
changeModeAction,
scanStopAction,
runQueriesAction,
} from './actionTypes';
import { Reducer } from 'redux';
......@@ -32,7 +32,7 @@ import { ActionOf } from 'app/core/redux/actionCreatorFactory';
import { updateLocation } from 'app/core/actions/location';
import { serializeStateToUrlParam } from 'app/core/utils/explore';
import TableModel from 'app/core/table_model';
import { DataSourceApi, DataQuery, LogsModel, LogsDedupStrategy, dateTime } from '@grafana/ui';
import { DataSourceApi, DataQuery, LogsModel, LogsDedupStrategy, LoadingState, dateTime } from '@grafana/ui';
describe('Explore item reducer', () => {
describe('scanning', () => {
......@@ -166,9 +166,7 @@ describe('Explore item reducer', () => {
queryKeys,
supportedModes: [ExploreMode.Metrics, ExploreMode.Logs],
mode: ExploreMode.Metrics,
graphIsLoading: false,
tableIsLoading: false,
logIsLoading: false,
loadingState: LoadingState.NotStarted,
latency: 0,
queryErrors: [],
};
......
import _ from 'lodash';
import {
calculateResultsFromQueryTransactions,
getIntervals,
ensureQueries,
getQueryKeys,
......@@ -10,7 +9,7 @@ import {
sortLogsResult,
} from 'app/core/utils/explore';
import { ExploreItemState, ExploreState, ExploreId, ExploreUpdateState, ExploreMode } from 'app/types/explore';
import { DataQuery, LogsModel } from '@grafana/ui';
import { DataQuery, LoadingState } from '@grafana/ui';
import {
HigherOrderAction,
ActionTypes,
......@@ -20,10 +19,17 @@ import {
splitCloseAction,
SplitCloseActionPayload,
loadExploreDatasources,
runQueriesAction,
historyUpdatedAction,
resetQueryErrorAction,
changeModeAction,
queryFailureAction,
setUrlReplacedAction,
querySuccessAction,
scanRangeAction,
scanStopAction,
resetQueryErrorAction,
queryStartAction,
runQueriesAction,
changeRangeAction,
} from './actionTypes';
import { reducerFactory } from 'app/core/redux';
import {
......@@ -40,13 +46,8 @@ import {
loadDatasourcePendingAction,
loadDatasourceReadyAction,
modifyQueriesAction,
queryFailureAction,
queryStartAction,
querySuccessAction,
removeQueryRowAction,
scanRangeAction,
scanStartAction,
scanStopAction,
setQueriesAction,
toggleTableAction,
queriesImportedAction,
......@@ -57,8 +58,6 @@ import { updateLocation } from 'app/core/actions/location';
import { LocationUpdate } from 'app/types';
import TableModel from 'app/core/table_model';
import { isLive } from '@grafana/ui/src/components/RefreshPicker/RefreshPicker';
import { subscriptionDataReceivedAction, startSubscriptionAction } from './epics';
import { seriesDataToLogsModel } from 'app/core/logs_model';
export const DEFAULT_RANGE = {
from: 'now-6h',
......@@ -100,9 +99,7 @@ export const makeExploreItemState = (): ExploreItemState => ({
scanRange: null,
showingGraph: true,
showingTable: true,
graphIsLoading: false,
logIsLoading: false,
tableIsLoading: false,
loadingState: LoadingState.NotStarted,
queryKeys: [],
urlState: null,
update: makeInitialUpdateState(),
......@@ -111,6 +108,7 @@ export const makeExploreItemState = (): ExploreItemState => ({
supportedModes: [],
mode: null,
isLive: false,
urlReplaced: false,
});
/**
......@@ -191,10 +189,8 @@ export const itemReducer = reducerFactory<ExploreItemState>({} as ExploreItemSta
return {
...state,
refreshInterval: refreshInterval,
graphIsLoading: live ? true : false,
tableIsLoading: live ? true : false,
logIsLoading: live ? true : false,
refreshInterval,
loadingState: live ? LoadingState.Streaming : LoadingState.NotStarted,
isLive: live,
logsResult,
};
......@@ -267,9 +263,7 @@ export const itemReducer = reducerFactory<ExploreItemState>({} as ExploreItemSta
datasourceInstance,
queryErrors: [],
latency: 0,
graphIsLoading: false,
logIsLoading: false,
tableIsLoading: false,
loadingState: LoadingState.NotStarted,
StartPage,
showingStartPage: Boolean(StartPage),
queryKeys: getQueryKeys(state.queries, datasourceInstance),
......@@ -346,35 +340,29 @@ export const itemReducer = reducerFactory<ExploreItemState>({} as ExploreItemSta
.addMapper({
filter: queryFailureAction,
mapper: (state, action): ExploreItemState => {
const { resultType, response } = action.payload;
const { response } = action.payload;
const queryErrors = state.queryErrors.concat(response);
return {
...state,
graphResult: resultType === 'Graph' ? null : state.graphResult,
tableResult: resultType === 'Table' ? null : state.tableResult,
logsResult: resultType === 'Logs' ? null : state.logsResult,
graphResult: null,
tableResult: null,
logsResult: null,
latency: 0,
queryErrors,
graphIsLoading: resultType === 'Graph' ? false : state.graphIsLoading,
logIsLoading: resultType === 'Logs' ? false : state.logIsLoading,
tableIsLoading: resultType === 'Table' ? false : state.tableIsLoading,
loadingState: LoadingState.Error,
update: makeInitialUpdateState(),
};
},
})
.addMapper({
filter: queryStartAction,
mapper: (state, action): ExploreItemState => {
const { resultType } = action.payload;
mapper: (state): ExploreItemState => {
return {
...state,
queryErrors: [],
latency: 0,
graphIsLoading: resultType === 'Graph' ? true : state.graphIsLoading,
logIsLoading: resultType === 'Logs' ? true : state.logIsLoading,
tableIsLoading: resultType === 'Table' ? true : state.tableIsLoading,
loadingState: LoadingState.Loading,
update: makeInitialUpdateState(),
};
},
......@@ -382,81 +370,21 @@ export const itemReducer = reducerFactory<ExploreItemState>({} as ExploreItemSta
.addMapper({
filter: querySuccessAction,
mapper: (state, action): ExploreItemState => {
const { queryIntervals, refreshInterval } = state;
const { result, resultType, latency } = action.payload;
const results = calculateResultsFromQueryTransactions(result, resultType, queryIntervals.intervalMs);
const live = isLive(refreshInterval);
if (live) {
return state;
}
return {
...state,
graphResult: resultType === 'Graph' ? results.graphResult : state.graphResult,
tableResult: resultType === 'Table' ? results.tableResult : state.tableResult,
logsResult:
resultType === 'Logs'
? sortLogsResult(results.logsResult, refreshInterval)
: sortLogsResult(state.logsResult, refreshInterval),
latency,
graphIsLoading: live ? true : false,
logIsLoading: live ? true : false,
tableIsLoading: live ? true : false,
showingStartPage: false,
update: makeInitialUpdateState(),
};
},
})
.addMapper({
filter: startSubscriptionAction,
mapper: (state): ExploreItemState => {
const logsResult = sortLogsResult(state.logsResult, state.refreshInterval);
const { latency, loadingState, graphResult, tableResult, logsResult } = action.payload;
return {
...state,
loadingState,
graphResult,
tableResult,
logsResult,
graphIsLoading: true,
logIsLoading: true,
tableIsLoading: true,
latency,
showingStartPage: false,
update: makeInitialUpdateState(),
};
},
})
.addMapper({
filter: subscriptionDataReceivedAction,
mapper: (state, action): ExploreItemState => {
const { queryIntervals, refreshInterval } = state;
const { data } = action.payload;
const live = isLive(refreshInterval);
if (!live) {
return state;
}
const newResults = seriesDataToLogsModel([data], queryIntervals.intervalMs);
const rowsInState = sortLogsResult(state.logsResult, state.refreshInterval).rows;
const processedRows = [];
for (const row of rowsInState) {
processedRows.push({ ...row, fresh: false });
}
for (const row of newResults.rows) {
processedRows.push({ ...row, fresh: true });
}
const rows = processedRows.slice(processedRows.length - 1000, 1000);
const logsResult: LogsModel = state.logsResult ? { ...state.logsResult, rows } : { hasUniqueLabels: false, rows };
return {
...state,
logsResult,
};
},
})
.addMapper({
filter: removeQueryRowAction,
mapper: (state, action): ExploreItemState => {
const { queries, queryKeys } = state;
......@@ -635,6 +563,24 @@ export const itemReducer = reducerFactory<ExploreItemState>({} as ExploreItemSta
};
},
})
.addMapper({
filter: setUrlReplacedAction,
mapper: (state): ExploreItemState => {
return {
...state,
urlReplaced: true,
};
},
})
.addMapper({
filter: changeRangeAction,
mapper: (state, action): ExploreItemState => {
return {
...state,
range: action.payload.range,
};
},
})
.create();
export const updateChildRefreshState = (
......
import {
DataQueryResponse,
TableData,
isTableData,
LogsModel,
toSeriesData,
guessFieldTypes,
DataQueryResponseData,
TimeSeries,
} from '@grafana/ui';
import { ExploreItemState, ExploreMode } from 'app/types/explore';
import { getProcessedSeriesData } from 'app/features/dashboard/state/PanelQueryState';
import TableModel, { mergeTablesIntoModel } from 'app/core/table_model';
import { sortLogsResult } from 'app/core/utils/explore';
import { seriesDataToLogsModel } from 'app/core/logs_model';
import { default as TimeSeries2 } from 'app/core/time_series2';
import { DataProcessor } from 'app/plugins/panel/graph/data_processor';
export class ResultProcessor {
private rawData: DataQueryResponseData[] = [];
private metrics: TimeSeries[] = [];
private tables: TableData[] = [];
constructor(
private state: ExploreItemState,
private replacePreviousResults: boolean,
result?: DataQueryResponse | DataQueryResponseData[]
) {
if (result && result.hasOwnProperty('data')) {
this.rawData = (result as DataQueryResponse).data;
} else {
this.rawData = (result as DataQueryResponseData[]) || [];
}
if (this.state.mode !== ExploreMode.Metrics) {
return;
}
for (let index = 0; index < this.rawData.length; index++) {
const res: any = this.rawData[index];
const isTable = isTableData(res);
if (isTable) {
this.tables.push(res);
} else {
this.metrics.push(res);
}
}
}
getRawData = (): any[] => {
return this.rawData;
};
getGraphResult = (): TimeSeries[] => {
if (this.state.mode !== ExploreMode.Metrics) {
return [];
}
const newResults = this.makeTimeSeriesList(this.metrics);
return this.mergeGraphResults(newResults, this.state.graphResult);
};
getTableResult = (): TableModel => {
if (this.state.mode !== ExploreMode.Metrics) {
return new TableModel();
}
const prevTableResults = this.state.tableResult || [];
const tablesToMerge = this.replacePreviousResults ? this.tables : [].concat(prevTableResults, this.tables);
return mergeTablesIntoModel(new TableModel(), ...tablesToMerge);
};
getLogsResult = (): LogsModel => {
if (this.state.mode !== ExploreMode.Logs) {
return null;
}
const graphInterval = this.state.queryIntervals.intervalMs;
const seriesData = this.rawData.map(result => guessFieldTypes(toSeriesData(result)));
const newResults = this.rawData ? seriesDataToLogsModel(seriesData, graphInterval) : null;
if (this.replacePreviousResults) {
return newResults;
}
const prevLogsResult: LogsModel = this.state.logsResult || { hasUniqueLabels: false, rows: [] };
const sortedLogResult = sortLogsResult(prevLogsResult, this.state.refreshInterval);
const rowsInState = sortedLogResult.rows;
const seriesInState = sortedLogResult.series || [];
const processedRows = [];
for (const row of rowsInState) {
processedRows.push({ ...row, fresh: false });
}
for (const row of newResults.rows) {
processedRows.push({ ...row, fresh: true });
}
const processedSeries = this.mergeGraphResults(newResults.series, seriesInState);
const slice = -1000;
const rows = processedRows.slice(slice);
const series = processedSeries.slice(slice);
return { ...newResults, rows, series };
};
private makeTimeSeriesList = (rawData: any[]) => {
const dataList = getProcessedSeriesData(rawData);
const dataProcessor = new DataProcessor({ xaxis: {}, aliasColors: [] }); // Hack before we use GraphSeriesXY instead
const timeSeries = dataProcessor.getSeriesList({ dataList });
return (timeSeries as any) as TimeSeries[]; // Hack before we use GraphSeriesXY instead
};
private isSameTimeSeries = (a: TimeSeries | TimeSeries2, b: TimeSeries | TimeSeries2) => {
if (a.hasOwnProperty('id') && b.hasOwnProperty('id')) {
if (a['id'] !== undefined && b['id'] !== undefined && a['id'] === b['id']) {
return true;
}
}
if (a.hasOwnProperty('alias') && b.hasOwnProperty('alias')) {
if (a['alias'] !== undefined && b['alias'] !== undefined && a['alias'] === b['alias']) {
return true;
}
}
return false;
};
private mergeGraphResults = (
newResults: TimeSeries[] | TimeSeries2[],
prevResults: TimeSeries[] | TimeSeries2[]
): TimeSeries[] => {
if (!prevResults || prevResults.length === 0 || this.replacePreviousResults) {
return (newResults as any) as TimeSeries[]; // Hack before we use GraphSeriesXY instead
}
const results: TimeSeries[] = prevResults.slice() as TimeSeries[];
// update existing results
for (let index = 0; index < results.length; index++) {
const prevResult = results[index];
for (const newResult of newResults) {
const isSame = this.isSameTimeSeries(prevResult, newResult);
if (isSame) {
prevResult.datapoints = prevResult.datapoints.concat(newResult.datapoints);
break;
}
}
}
// add new results
for (const newResult of newResults) {
let isNew = true;
for (const prevResult of results) {
const isSame = this.isSameTimeSeries(prevResult, newResult);
if (isSame) {
isNew = false;
break;
}
}
if (isNew) {
const timeSeries2Result = new TimeSeries2({ ...newResult });
const result = (timeSeries2Result as any) as TimeSeries; // Hack before we use GraphSeriesXY instead
results.push(result);
}
}
return results;
};
}
// Libraries
import _ from 'lodash';
import { Subscription, of } from 'rxjs';
import { webSocket } from 'rxjs/webSocket';
import { catchError, map } from 'rxjs/operators';
// Services & Utils
import * as dateMath from '@grafana/ui/src/utils/datemath';
......@@ -17,11 +20,14 @@ import {
DataSourceInstanceSettings,
DataQueryError,
LogRowModel,
DataStreamObserver,
LoadingState,
DataStreamState,
} from '@grafana/ui';
import { LokiQuery, LokiOptions } from './types';
import { BackendSrv } from 'app/core/services/backend_srv';
import { TemplateSrv } from 'app/features/templating/template_srv';
import { safeStringifyValue } from 'app/core/utils/explore';
import { safeStringifyValue, convertToWebSocketUrl } from 'app/core/utils/explore';
export const DEFAULT_MAX_LINES = 1000;
......@@ -47,6 +53,7 @@ interface LokiContextQueryOptions {
}
export class LokiDatasource extends DataSourceApi<LokiQuery, LokiOptions> {
private subscriptions: { [key: string]: Subscription } = null;
languageProvider: LanguageProvider;
maxLines: number;
......@@ -60,6 +67,7 @@ export class LokiDatasource extends DataSourceApi<LokiQuery, LokiOptions> {
this.languageProvider = new LanguageProvider(this);
const settingsData = instanceSettings.jsonData || {};
this.maxLines = parseInt(settingsData.maxLines, 10) || DEFAULT_MAX_LINES;
this.subscriptions = {};
}
_request(apiUrl: string, data?, options?: any) {
......@@ -73,42 +81,21 @@ export class LokiDatasource extends DataSourceApi<LokiQuery, LokiOptions> {
return this.backendSrv.datasourceRequest(req);
}
convertToStreamTargets = (options: DataQueryRequest<LokiQuery>): Array<{ url: string; refId: string }> => {
return options.targets
.filter(target => target.expr && !target.hide)
.map(target => {
prepareLiveTarget(target: LokiQuery, options: DataQueryRequest<LokiQuery>) {
const interpolated = this.templateSrv.replace(target.expr);
const { query, regexp } = parseQuery(interpolated);
const refId = target.refId;
const baseUrl = this.instanceSettings.url;
const params = serializeParams({ query, regexp });
const url = `${baseUrl}/api/prom/tail?${params}`;
const url = convertToWebSocketUrl(`${baseUrl}/api/prom/tail?${params}`);
return {
query,
regexp,
url,
refId,
};
});
};
resultToSeriesData = (data: any, refId: string): SeriesData[] => {
const toSeriesData = (stream: any, refId: string) => ({
...logStreamToSeriesData(stream),
refId,
});
if (data.streams) {
// new Loki API purposed in https://github.com/grafana/loki/pull/590
const series: SeriesData[] = [];
for (const stream of data.streams || []) {
series.push(toSeriesData(stream, refId));
}
return series;
}
return [toSeriesData(data, refId)];
};
prepareQueryTarget(target: LokiQuery, options: DataQueryRequest<LokiQuery>) {
const interpolated = this.templateSrv.replace(target.expr);
const { query, regexp } = parseQuery(interpolated);
......@@ -126,21 +113,15 @@ export class LokiDatasource extends DataSourceApi<LokiQuery, LokiOptions> {
};
}
async query(options: DataQueryRequest<LokiQuery>) {
const queryTargets = options.targets
.filter(target => target.expr && !target.hide)
.map(target => this.prepareQueryTarget(target, options));
if (queryTargets.length === 0) {
return Promise.resolve({ data: [] });
}
const queries = queryTargets.map(target =>
this._request('/api/prom/query', target).catch((err: any) => {
if (err.cancelled) {
return err;
unsubscribe = (refId: string) => {
const subscription = this.subscriptions[refId];
if (subscription && !subscription.closed) {
subscription.unsubscribe();
delete this.subscriptions[refId];
}
};
processError = (err: any, target: any): DataQueryError => {
const error: DataQueryError = {
message: 'Unknown error during query transaction. Please check JS console logs.',
refId: target.refId,
......@@ -161,33 +142,112 @@ export class LokiDatasource extends DataSourceApi<LokiQuery, LokiOptions> {
error.status = err.status;
error.statusText = err.statusText;
return error;
};
processResult = (data: any, target: any): SeriesData[] => {
const series: SeriesData[] = [];
if (Object.keys(data).length === 0) {
return series;
}
if (!data.streams) {
return [{ ...logStreamToSeriesData(data), refId: target.refId }];
}
for (const stream of data.streams || []) {
const seriesData = logStreamToSeriesData(stream);
seriesData.refId = target.refId;
seriesData.meta = {
searchWords: getHighlighterExpressionsFromQuery(formatQuery(target.query, target.regexp)),
limit: this.maxLines,
};
series.push(seriesData);
}
return series;
};
runLiveQueries = (options: DataQueryRequest<LokiQuery>, observer?: DataStreamObserver) => {
const liveTargets = options.targets
.filter(target => target.expr && !target.hide && target.live)
.map(target => this.prepareLiveTarget(target, options));
for (const liveTarget of liveTargets) {
const subscription = webSocket(liveTarget.url)
.pipe(
map((results: any[]) => {
const delta = this.processResult(results, liveTarget);
const state: DataStreamState = {
key: `loki-${liveTarget.refId}`,
request: options,
state: LoadingState.Streaming,
delta,
unsubscribe: () => this.unsubscribe(liveTarget.refId),
};
return state;
}),
catchError(err => {
const error = this.processError(err, liveTarget);
const state: DataStreamState = {
key: `loki-${liveTarget.refId}`,
request: options,
state: LoadingState.Error,
error,
unsubscribe: () => this.unsubscribe(liveTarget.refId),
};
return of(state);
})
)
.subscribe({
next: state => observer(state),
});
this.subscriptions[liveTarget.refId] = subscription;
}
};
runQueries = async (options: DataQueryRequest<LokiQuery>) => {
const queryTargets = options.targets
.filter(target => target.expr && !target.hide && !target.live)
.map(target => this.prepareQueryTarget(target, options));
if (queryTargets.length === 0) {
return Promise.resolve({ data: [] });
}
const queries = queryTargets.map(target =>
this._request('/api/prom/query', target).catch((err: any) => {
if (err.cancelled) {
return err;
}
const error: DataQueryError = this.processError(err, target);
throw error;
})
);
return Promise.all(queries).then((results: any[]) => {
const series: Array<SeriesData | DataQueryError> = [];
let series: SeriesData[] = [];
for (let i = 0; i < results.length; i++) {
const result = results[i];
if (result.data) {
const refId = queryTargets[i].refId;
for (const stream of result.data.streams || []) {
const seriesData = logStreamToSeriesData(stream);
seriesData.refId = refId;
seriesData.meta = {
searchWords: getHighlighterExpressionsFromQuery(
formatQuery(queryTargets[i].query, queryTargets[i].regexp)
),
limit: this.maxLines,
};
series.push(seriesData);
}
series = series.concat(this.processResult(result.data, queryTargets[i]));
}
}
return { data: series };
});
};
async query(options: DataQueryRequest<LokiQuery>, observer?: DataStreamObserver) {
this.runLiveQueries(options, observer);
return this.runQueries(options);
}
async importQueries(queries: LokiQuery[], originMeta: PluginMeta): Promise<LokiQuery[]> {
......
......@@ -16,6 +16,7 @@ import {
} from 'app/types/explore';
import { LokiQuery } from './types';
import { dateTime } from '@grafana/ui/src/utils/moment_wrapper';
import { PromQuery } from '../prometheus/types';
const DEFAULT_KEYS = ['job', 'namespace'];
const EMPTY_SELECTOR = '{}';
......@@ -168,8 +169,9 @@ export default class LokiLanguageProvider extends LanguageProvider {
return Promise.all(
queries.map(async query => {
const expr = await this.importPrometheusQuery(query.expr);
const { context, ...rest } = query as PromQuery;
return {
...query,
...rest,
expr,
};
})
......
......@@ -8,6 +8,7 @@
"alerting": false,
"annotations": false,
"logs": true,
"streaming": true,
"info": {
"description": "Like Prometheus but for logs. OSS logging solution from Grafana Labs",
......
......@@ -2,6 +2,9 @@ import { DataQuery, Labels, DataSourceJsonData } from '@grafana/ui/src/types';
export interface LokiQuery extends DataQuery {
expr: string;
live?: boolean;
query?: string;
regexp?: string;
}
export interface LokiOptions extends DataSourceJsonData {
......
......@@ -223,7 +223,7 @@ class PromQueryField extends React.PureComponent<PromQueryFieldProps, PromQueryF
// Send text change to parent
const { query, onChange, onRunQuery } = this.props;
if (onChange) {
const nextQuery: PromQuery = { ...query, expr: value };
const nextQuery: PromQuery = { ...query, expr: value, context: 'explore' };
onChange(nextQuery);
if (override && onRunQuery) {
......
// Libraries
import _ from 'lodash';
import $ from 'jquery';
import { from, Observable } from 'rxjs';
// Services & Utils
import kbn from 'app/core/utils/kbn';
......@@ -14,18 +15,21 @@ import { getQueryHints } from './query_hints';
import { expandRecordingRules } from './language_utils';
// Types
import { PromQuery, PromOptions } from './types';
import { PromQuery, PromOptions, PromQueryRequest } from './types';
import {
DataQueryRequest,
DataSourceApi,
AnnotationEvent,
DataSourceInstanceSettings,
DataQueryError,
DataStreamObserver,
LoadingState,
} from '@grafana/ui/src/types';
import { ExploreUrlState } from 'app/types/explore';
import { safeStringifyValue } from 'app/core/utils/explore';
import { TemplateSrv } from 'app/features/templating/template_srv';
import { TimeSrv } from 'app/features/dashboard/services/TimeSrv';
import { single, map, filter } from 'rxjs/operators';
export class PrometheusDatasource extends DataSourceApi<PromQuery, PromOptions> {
type: string;
......@@ -83,7 +87,7 @@ export class PrometheusDatasource extends DataSourceApi<PromQuery, PromOptions>
}
}
_request(url, data?, options?: any) {
_request(url: string, data?: any, options?: any) {
options = _.defaults(options || {}, {
url: this.url + url,
method: this.httpMethod,
......@@ -119,11 +123,11 @@ export class PrometheusDatasource extends DataSourceApi<PromQuery, PromOptions>
}
// Use this for tab completion features, wont publish response to other components
metadataRequest(url) {
metadataRequest(url: string) {
return this._request(url, null, { method: 'GET', silent: true });
}
interpolateQueryExpr(value, variable, defaultFormatFn) {
interpolateQueryExpr(value: any, variable: any, defaultFormatFn: any) {
// if no multi or include all do not regexEscape
if (!variable.multi && !variable.includeAll) {
return prometheusRegularEscape(value);
......@@ -141,34 +145,132 @@ export class PrometheusDatasource extends DataSourceApi<PromQuery, PromOptions>
return this.templateSrv.variableExists(target.expr);
}
query(options: DataQueryRequest<PromQuery>): Promise<{ data: any }> {
const start = this.getPrometheusTime(options.range.from, false);
const end = this.getPrometheusTime(options.range.to, true);
processResult = (response: any, query: PromQueryRequest, target: PromQuery, responseListLength: number) => {
// Keeping original start/end for transformers
const transformerOptions = {
format: target.format,
step: query.step,
legendFormat: target.legendFormat,
start: query.start,
end: query.end,
query: query.expr,
responseListLength,
refId: target.refId,
valueWithRefId: target.valueWithRefId,
};
const series = this.resultTransformer.transform(response, transformerOptions);
const queries = [];
const activeTargets = [];
return series;
};
options = _.clone(options);
runObserverQueries = (
options: DataQueryRequest<PromQuery>,
observer: DataStreamObserver,
queries: PromQueryRequest[],
activeTargets: PromQuery[],
end: number
) => {
for (let index = 0; index < queries.length; index++) {
const query = queries[index];
const target = activeTargets[index];
let observable: Observable<any> = null;
if (query.instant) {
observable = from(this.performInstantQuery(query, end));
} else {
observable = from(this.performTimeSeriesQuery(query, query.start, query.end));
}
observable
.pipe(
single(), // unsubscribes automatically after first result
filter((response: any) => (response.cancelled ? false : true)),
map((response: any) => {
return this.processResult(response, query, target, queries.length);
})
)
.subscribe({
next: series => {
if (query.instant) {
observer({
key: `prometheus-${target.refId}`,
state: LoadingState.Loading,
request: options,
series: null,
delta: series,
unsubscribe: () => undefined,
});
} else {
observer({
key: `prometheus-${target.refId}`,
state: LoadingState.Done,
request: options,
series: null,
delta: series,
unsubscribe: () => undefined,
});
}
},
});
}
};
prepareTargets = (options: DataQueryRequest<PromQuery>, start: number, end: number) => {
const queries: PromQueryRequest[] = [];
const activeTargets: PromQuery[] = [];
for (const target of options.targets) {
if (!target.expr || target.hide) {
continue;
}
if (target.context === 'explore') {
target.format = 'time_series';
target.instant = false;
const instantTarget: any = _.cloneDeep(target);
instantTarget.format = 'table';
instantTarget.instant = true;
instantTarget.valueWithRefId = true;
delete instantTarget.maxDataPoints;
instantTarget.requestId += '_instant';
instantTarget.refId += '_instant';
activeTargets.push(instantTarget);
queries.push(this.createQuery(instantTarget, options, start, end));
}
activeTargets.push(target);
queries.push(this.createQuery(target, options, start, end));
}
return {
queries,
activeTargets,
};
};
query(options: DataQueryRequest<PromQuery>, observer?: DataStreamObserver): Promise<{ data: any }> {
const start = this.getPrometheusTime(options.range.from, false);
const end = this.getPrometheusTime(options.range.to, true);
options = _.clone(options);
const { queries, activeTargets } = this.prepareTargets(options, start, end);
// No valid targets, return the empty result to save a round trip.
if (_.isEmpty(queries)) {
return this.$q.when({ data: [] }) as Promise<{ data: any }>;
}
if (observer && options.targets.filter(target => target.context === 'explore').length === options.targets.length) {
// using observer to make the instant query return immediately
this.runObserverQueries(options, observer, queries, activeTargets, end);
return this.$q.when({ data: [] }) as Promise<{ data: any }>;
}
const allQueryPromise = _.map(queries, query => {
if (!query.instant) {
return this.performTimeSeriesQuery(query, query.start, query.end);
} else {
if (query.instant) {
return this.performInstantQuery(query, end);
} else {
return this.performTimeSeriesQuery(query, query.start, query.end);
}
});
......@@ -180,19 +282,10 @@ export class PrometheusDatasource extends DataSourceApi<PromQuery, PromOptions>
return;
}
// Keeping original start/end for transformers
const transformerOptions = {
format: activeTargets[index].format,
step: queries[index].step,
legendFormat: activeTargets[index].legendFormat,
start: queries[index].start,
end: queries[index].end,
query: queries[index].expr,
responseListLength: responseList.length,
refId: activeTargets[index].refId,
valueWithRefId: activeTargets[index].valueWithRefId,
};
const series = this.resultTransformer.transform(response, transformerOptions);
const target = activeTargets[index];
const query = queries[index];
const series = this.processResult(response, query, target, queries.length);
result = [...result, ...series];
});
......@@ -202,10 +295,16 @@ export class PrometheusDatasource extends DataSourceApi<PromQuery, PromOptions>
return allPromise as Promise<{ data: any }>;
}
createQuery(target, options, start, end) {
const query: any = {
createQuery(target: PromQuery, options: DataQueryRequest<PromQuery>, start: number, end: number) {
const query: PromQueryRequest = {
hinting: target.hinting,
instant: target.instant,
step: 0,
expr: '',
requestId: '',
refId: '',
start: 0,
end: 0,
};
const range = Math.ceil(end - start);
......@@ -398,7 +497,7 @@ export class PrometheusDatasource extends DataSourceApi<PromQuery, PromOptions>
};
// Unsetting min interval for accurate event resolution
const minStep = '1s';
const query = this.createQuery({ expr, interval: minStep }, queryOptions, start, end);
const query = this.createQuery({ expr, interval: minStep, refId: 'X' }, queryOptions, start, end);
const self = this;
return this.performTimeSeriesQuery(query, query.start, query.end).then(results => {
......
......@@ -2,6 +2,14 @@ import { DataQuery, DataSourceJsonData } from '@grafana/ui/src/types';
export interface PromQuery extends DataQuery {
expr: string;
context?: 'explore' | 'panel';
format?: string;
instant?: boolean;
hinting?: boolean;
interval?: string;
intervalFactor?: number;
legendFormat?: string;
valueWithRefId?: boolean;
}
export interface PromOptions extends DataSourceJsonData {
......@@ -10,3 +18,10 @@ export interface PromOptions extends DataSourceJsonData {
httpMethod: string;
directUrl: string;
}
export interface PromQueryRequest extends PromQuery {
step?: number;
requestId?: string;
start: number;
end: number;
}
......@@ -15,8 +15,22 @@ import usersReducers from 'app/features/users/state/reducers';
import userReducers from 'app/features/profile/state/reducers';
import organizationReducers from 'app/features/org/state/reducers';
import { setStore } from './store';
import { startSubscriptionsEpic, startSubscriptionEpic, limitMessageRateEpic } from 'app/features/explore/state/epics';
import { WebSocketSubject, webSocket } from 'rxjs/webSocket';
import { limitMessageRateEpic } from 'app/features/explore/state/epics/limitMessageRateEpic';
import { stateSaveEpic } from 'app/features/explore/state/epics/stateSaveEpic';
import { processQueryResultsEpic } from 'app/features/explore/state/epics/processQueryResultsEpic';
import { processQueryErrorsEpic } from 'app/features/explore/state/epics/processQueryErrorsEpic';
import { runQueriesEpic } from 'app/features/explore/state/epics/runQueriesEpic';
import { runQueriesBatchEpic } from 'app/features/explore/state/epics/runQueriesBatchEpic';
import {
DataSourceApi,
DataQueryResponse,
DataQuery,
DataSourceJsonData,
DataQueryRequest,
DataStreamObserver,
} from '@grafana/ui';
import { Observable } from 'rxjs';
import { getQueryResponse } from 'app/core/utils/explore';
import { StoreState } from 'app/types/store';
import { toggleLogActionsMiddleware } from 'app/core/middlewares/application';
......@@ -39,14 +53,25 @@ export function addRootReducer(reducers) {
Object.assign(rootReducers, ...reducers);
}
export const rootEpic: any = combineEpics(startSubscriptionsEpic, startSubscriptionEpic, limitMessageRateEpic);
export const rootEpic: any = combineEpics(
limitMessageRateEpic,
stateSaveEpic,
runQueriesEpic,
runQueriesBatchEpic,
processQueryResultsEpic,
processQueryErrorsEpic
);
export interface EpicDependencies {
getWebSocket: <T>(urlConfigOrSource: string) => WebSocketSubject<T>;
getQueryResponse: (
datasourceInstance: DataSourceApi<DataQuery, DataSourceJsonData>,
options: DataQueryRequest<DataQuery>,
observer?: DataStreamObserver
) => Observable<DataQueryResponse>;
}
const dependencies: EpicDependencies = {
getWebSocket: webSocket,
getQueryResponse,
};
const epicMiddleware = createEpicMiddleware({ dependencies });
......
......@@ -3,7 +3,6 @@ import { Value } from 'slate';
import {
RawTimeRange,
DataQuery,
DataQueryResponseData,
DataSourceSelectItem,
DataSourceApi,
QueryHint,
......@@ -13,9 +12,10 @@ import {
DataQueryError,
LogsModel,
LogsDedupStrategy,
LoadingState,
} from '@grafana/ui';
import { Emitter, TimeSeries } from 'app/core/core';
import { Emitter } from 'app/core/core';
import TableModel from 'app/core/table_model';
export enum ExploreMode {
......@@ -215,9 +215,7 @@ export interface ExploreItemState {
*/
showingTable: boolean;
graphIsLoading: boolean;
logIsLoading: boolean;
tableIsLoading: boolean;
loadingState: LoadingState;
/**
* Table model that combines all query table results into a single table.
*/
......@@ -254,6 +252,7 @@ export interface ExploreItemState {
mode: ExploreMode;
isLive: boolean;
urlReplaced: boolean;
}
export interface ExploreUpdateState {
......@@ -314,11 +313,8 @@ export interface QueryIntervals {
export interface QueryOptions {
interval: string;
format: string;
hinting?: boolean;
instant?: boolean;
valueWithRefId?: boolean;
maxDataPoints?: number;
live?: boolean;
}
export interface QueryTransaction {
......@@ -330,23 +326,14 @@ export interface QueryTransaction {
options: any;
queries: DataQuery[];
result?: any; // Table model / Timeseries[] / Logs
resultType: ResultType;
scanning?: boolean;
}
export type RangeScanner = () => RawTimeRange;
export type ResultGetter = (
result: DataQueryResponseData,
transaction: QueryTransaction,
allTransactions: QueryTransaction[]
) => TimeSeries;
export interface TextMatch {
text: string;
start: number;
length: number;
end: number;
}
export type ResultType = 'Graph' | 'Logs' | 'Table';
import { Epic, ActionsObservable, StateObservable } from 'redux-observable';
import { Subject } from 'rxjs';
import { WebSocketSubject } from 'rxjs/webSocket';
import {
DataSourceApi,
DataQuery,
DataSourceJsonData,
DataQueryRequest,
DataStreamObserver,
DataQueryResponse,
DataStreamState,
} from '@grafana/ui';
import { ActionOf } from 'app/core/redux/actionCreatorFactory';
import { StoreState } from 'app/types/store';
......@@ -8,21 +16,30 @@ import { EpicDependencies } from 'app/store/configureStore';
export const epicTester = (
epic: Epic<ActionOf<any>, ActionOf<any>, StoreState, EpicDependencies>,
state?: StoreState
state?: Partial<StoreState>
) => {
const resultingActions: Array<ActionOf<any>> = [];
const action$ = new Subject<ActionOf<any>>();
const state$ = new Subject<StoreState>();
const actionObservable$ = new ActionsObservable(action$);
const stateObservable$ = new StateObservable(state$, state || ({} as StoreState));
const websockets$: Array<Subject<any>> = [];
const stateObservable$ = new StateObservable(state$, (state as StoreState) || ({} as StoreState));
const queryResponse$ = new Subject<DataQueryResponse>();
const observer$ = new Subject<DataStreamState>();
const getQueryResponse = (
datasourceInstance: DataSourceApi<DataQuery, DataSourceJsonData>,
options: DataQueryRequest<DataQuery>,
observer?: DataStreamObserver
) => {
if (observer) {
observer$.subscribe({ next: event => observer(event) });
}
return queryResponse$;
};
const dependencies: EpicDependencies = {
getWebSocket: () => {
const webSocket$ = new Subject<any>();
websockets$.push(webSocket$);
return webSocket$ as WebSocketSubject<any>;
},
getQueryResponse,
};
epic(actionObservable$, stateObservable$, dependencies).subscribe({ next: action => resultingActions.push(action) });
const whenActionIsDispatched = (action: ActionOf<any>) => {
......@@ -31,14 +48,26 @@ export const epicTester = (
return instance;
};
const whenWebSocketReceivesData = (data: any) => {
websockets$.forEach(websocket$ => websocket$.next(data));
const whenQueryReceivesResponse = (response: DataQueryResponse) => {
queryResponse$.next(response);
return instance;
};
const whenQueryThrowsError = (error: any) => {
queryResponse$.error(error);
return instance;
};
const whenQueryObserverReceivesEvent = (event: DataStreamState) => {
observer$.next(event);
return instance;
};
const thenResultingActionsEqual = (...actions: Array<ActionOf<any>>) => {
expect(resultingActions).toEqual(actions);
expect(actions).toEqual(resultingActions);
return instance;
};
......@@ -51,7 +80,9 @@ export const epicTester = (
const instance = {
whenActionIsDispatched,
whenWebSocketReceivesData,
whenQueryReceivesResponse,
whenQueryThrowsError,
whenQueryObserverReceivesEvent,
thenResultingActionsEqual,
thenNoActionsWhereDispatched,
};
......
import { DataSourceApi } from '@grafana/ui/src/types/datasource';
import { ExploreId, ExploreItemState, ExploreState } from 'app/types/explore';
import { makeExploreItemState } from 'app/features/explore/state/reducers';
import { StoreState } from 'app/types';
export const mockExploreState = (options: any = {}) => {
const isLive = options.isLive || false;
const history = [];
const eventBridge = {
emit: jest.fn(),
};
const streaming = options.streaming || undefined;
const datasourceInterval = options.datasourceInterval || '';
const refreshInterval = options.refreshInterval || '';
const containerWidth = options.containerWidth || 1980;
const queries = options.queries || [];
const datasourceError = options.datasourceError || null;
const scanner = options.scanner || jest.fn();
const scanning = options.scanning || false;
const datasourceId = options.datasourceId || '1337';
const exploreId = ExploreId.left;
const datasourceInstance: DataSourceApi<any> = options.datasourceInstance || {
id: 1337,
query: jest.fn(),
name: 'test',
testDatasource: jest.fn(),
meta: {
id: datasourceId,
streaming,
},
interval: datasourceInterval,
};
const urlReplaced = options.urlReplaced || false;
const left: ExploreItemState = options.left || {
...makeExploreItemState(),
containerWidth,
datasourceError,
datasourceInstance,
eventBridge,
history,
isLive,
queries,
refreshInterval,
scanner,
scanning,
urlReplaced,
};
const right: ExploreItemState = options.right || {
...makeExploreItemState(),
containerWidth,
datasourceError,
datasourceInstance,
eventBridge,
history,
isLive,
queries,
refreshInterval,
scanner,
scanning,
urlReplaced,
};
const split: boolean = options.split || false;
const explore: ExploreState = {
left,
right,
split,
};
const state: Partial<StoreState> = {
explore,
};
return {
containerWidth,
datasourceId,
datasourceInstance,
datasourceInterval,
eventBridge,
exploreId,
history,
queries,
refreshInterval,
state,
scanner,
};
};
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment