Commit aafae4c5 by Torkel Ödegaard Committed by GitHub

Explore: Use DataFrame to derive graph/table/logs (#18859)

* WIP: Use data frames in explore

* Explore: everything seems to be working again

* Reworked ResultProcessor tests

* Fixed unit test

* Add some typings and comments
parent 364d2358
...@@ -16,7 +16,6 @@ import { ...@@ -16,7 +16,6 @@ import {
LogsMetaKind, LogsMetaKind,
LogsDedupStrategy, LogsDedupStrategy,
GraphSeriesXY, GraphSeriesXY,
LoadingState,
dateTime, dateTime,
toUtc, toUtc,
NullValueMode, NullValueMode,
...@@ -193,7 +192,7 @@ export function dataFrameToLogsModel(dataFrame: DataFrame[], intervalMs: number) ...@@ -193,7 +192,7 @@ export function dataFrameToLogsModel(dataFrame: DataFrame[], intervalMs: number)
logsModel.series = makeSeriesForLogs(logsModel.rows, intervalMs); logsModel.series = makeSeriesForLogs(logsModel.rows, intervalMs);
} else { } else {
logsModel.series = getGraphSeriesModel( logsModel.series = getGraphSeriesModel(
{ series: metricSeries, state: LoadingState.Done }, metricSeries,
{}, {},
{ showBars: true, showLines: false, showPoints: false }, { showBars: true, showLines: false, showPoints: false },
{ {
......
...@@ -274,7 +274,9 @@ export class PanelQueryState { ...@@ -274,7 +274,9 @@ export class PanelQueryState {
return { return {
state: done ? LoadingState.Done : LoadingState.Streaming, state: done ? LoadingState.Done : LoadingState.Streaming,
series, // Union of series from response and all streams // This should not be needed but unfortunately Prometheus datasource sends non DataFrame here bypassing the
// typing.
series: this.sendFrames ? getProcessedDataFrames(series) : [],
legacy: this.sendLegacy ? translateToLegacyData(series) : undefined, legacy: this.sendLegacy ? translateToLegacyData(series) : undefined,
request: { request: {
...this.request, ...this.request,
......
...@@ -611,13 +611,10 @@ export const processQueryResponse = ( ...@@ -611,13 +611,10 @@ export const processQueryResponse = (
} }
const latency = request.endTime - request.startTime; const latency = request.endTime - request.startTime;
const processor = new ResultProcessor(state, replacePreviousResults, series);
// temporary hack until we switch to PanelData, Loki already converts to DataFrame so using legacy will destroy the format
const isLokiDataSource = state.datasourceInstance.meta.name === 'Loki';
const processor = new ResultProcessor(state, replacePreviousResults, isLokiDataSource ? series : legacy);
// For Angular editors // For Angular editors
state.eventBridge.emit('data-received', processor.getRawData()); state.eventBridge.emit('data-received', legacy);
return { return {
...state, ...state,
......
import { DataQueryResponse, DataQueryResponseData } from '@grafana/ui'; import { LogsModel, GraphSeriesXY, DataFrame, FieldType } from '@grafana/data';
import {
TableData,
isTableData,
LogsModel,
toDataFrame,
guessFieldTypes,
TimeSeries,
GraphSeriesXY,
LoadingState,
} from '@grafana/data';
import { ExploreItemState, ExploreMode } from 'app/types/explore'; import { ExploreItemState, ExploreMode } from 'app/types/explore';
import { getProcessedDataFrames } from 'app/features/dashboard/state/PanelQueryState';
import TableModel, { mergeTablesIntoModel } from 'app/core/table_model'; import TableModel, { mergeTablesIntoModel } from 'app/core/table_model';
import { sortLogsResult, refreshIntervalToSortOrder } from 'app/core/utils/explore'; import { sortLogsResult, refreshIntervalToSortOrder } from 'app/core/utils/explore';
import { dataFrameToLogsModel } from 'app/core/logs_model'; import { dataFrameToLogsModel } from 'app/core/logs_model';
import { getGraphSeriesModel } from 'app/plugins/panel/graph2/getGraphSeriesModel'; import { getGraphSeriesModel } from 'app/plugins/panel/graph2/getGraphSeriesModel';
export class ResultProcessor { export class ResultProcessor {
private rawData: DataQueryResponseData[] = [];
private metrics: TimeSeries[] = [];
private tables: TableData[] = [];
constructor( constructor(
private state: ExploreItemState, private state: ExploreItemState,
private replacePreviousResults: boolean, private replacePreviousResults: boolean,
result?: DataQueryResponse | DataQueryResponseData[] private dataFrames: DataFrame[]
) { ) {}
if (result && result.hasOwnProperty('data')) {
this.rawData = (result as DataQueryResponse).data;
} else {
this.rawData = (result as DataQueryResponseData[]) || [];
}
if (this.state.mode !== ExploreMode.Metrics) { getGraphResult(): GraphSeriesXY[] {
return;
}
for (let index = 0; index < this.rawData.length; index++) {
const res: any = this.rawData[index];
const isTable = isTableData(res);
if (isTable) {
this.tables.push(res);
} else {
this.metrics.push(res);
}
}
}
getRawData = (): any[] => {
return this.rawData;
};
getGraphResult = (): GraphSeriesXY[] => {
if (this.state.mode !== ExploreMode.Metrics) { if (this.state.mode !== ExploreMode.Metrics) {
return []; return [];
} }
const newResults = this.createGraphSeries(this.metrics); const onlyTimeSeries = this.dataFrames.filter(series => series.fields.length === 2);
return this.mergeGraphResults(newResults, this.state.graphResult);
};
getTableResult = (): TableModel => { return getGraphSeriesModel(
onlyTimeSeries,
{},
{ showBars: false, showLines: true, showPoints: false },
{ asTable: false, isVisible: true, placement: 'under' }
);
}
getTableResult(): TableModel {
if (this.state.mode !== ExploreMode.Metrics) { if (this.state.mode !== ExploreMode.Metrics) {
return new TableModel(); return new TableModel();
} }
const prevTableResults: any[] | TableModel = this.state.tableResult || []; // For now ignore time series
const tablesToMerge = this.replacePreviousResults ? this.tables : [].concat(prevTableResults, this.tables); // We can change this later, just need to figure out how to
// Ignore time series only for prometheus
const onlyTables = this.dataFrames.filter(frame => {
if (frame.fields.length === 2) {
if (frame.fields[1].type === FieldType.time) {
return false;
}
}
return true;
});
const tables = onlyTables.map(frame => {
const { fields } = frame;
const fieldCount = fields.length;
const rowCount = fields[0].values.length;
const columns = fields.map(field => ({
text: field.name,
type: field.type,
filterable: field.config.filterable,
}));
const rows: any[][] = [];
for (let i = 0; i < rowCount; i++) {
const row: any[] = [];
for (let j = 0; j < fieldCount; j++) {
row.push(frame.fields[j].values.get(i));
}
rows.push(row);
}
return new TableModel({
columns,
rows,
meta: frame.meta,
});
});
return mergeTablesIntoModel(new TableModel(), ...tablesToMerge); return mergeTablesIntoModel(new TableModel(), ...tables);
}; }
getLogsResult = (): LogsModel => { getLogsResult(): LogsModel {
if (this.state.mode !== ExploreMode.Logs) { if (this.state.mode !== ExploreMode.Logs) {
return null; return null;
} }
const graphInterval = this.state.queryIntervals.intervalMs; const graphInterval = this.state.queryIntervals.intervalMs;
const dataFrame = this.rawData.map(result => guessFieldTypes(toDataFrame(result)));
const newResults = this.rawData ? dataFrameToLogsModel(dataFrame, graphInterval) : null; const newResults = dataFrameToLogsModel(this.dataFrames, graphInterval);
const sortOrder = refreshIntervalToSortOrder(this.state.refreshInterval); const sortOrder = refreshIntervalToSortOrder(this.state.refreshInterval);
const sortedNewResults = sortLogsResult(newResults, sortOrder); const sortedNewResults = sortLogsResult(newResults, sortOrder);
...@@ -94,7 +97,6 @@ export class ResultProcessor { ...@@ -94,7 +97,6 @@ export class ResultProcessor {
const prevLogsResult: LogsModel = this.state.logsResult || { hasUniqueLabels: false, rows: [] }; const prevLogsResult: LogsModel = this.state.logsResult || { hasUniqueLabels: false, rows: [] };
const sortedLogResult = sortLogsResult(prevLogsResult, sortOrder); const sortedLogResult = sortLogsResult(prevLogsResult, sortOrder);
const rowsInState = sortedLogResult.rows; const rowsInState = sortedLogResult.rows;
const seriesInState = sortedLogResult.series || [];
const processedRows = []; const processedRows = [];
for (const row of rowsInState) { for (const row of rowsInState) {
...@@ -104,78 +106,10 @@ export class ResultProcessor { ...@@ -104,78 +106,10 @@ export class ResultProcessor {
processedRows.push({ ...row, fresh: true }); processedRows.push({ ...row, fresh: true });
} }
const processedSeries = this.mergeGraphResults(sortedNewResults.series, seriesInState);
const slice = -1000; const slice = -1000;
const rows = processedRows.slice(slice); const rows = processedRows.slice(slice);
const series = processedSeries.slice(slice); const series = sortedNewResults.series.slice(slice);
return { ...sortedNewResults, rows, series }; return { ...sortedNewResults, rows, series };
}; }
private createGraphSeries = (rawData: any[]) => {
const dataFrames = getProcessedDataFrames(rawData);
const graphSeries = getGraphSeriesModel(
{ series: dataFrames, state: LoadingState.Done },
{},
{ showBars: false, showLines: true, showPoints: false },
{
asTable: false,
isVisible: true,
placement: 'under',
}
);
return graphSeries;
};
private isSameGraphSeries = (a: GraphSeriesXY, b: GraphSeriesXY) => {
if (a.hasOwnProperty('label') && b.hasOwnProperty('label')) {
const aValue = a.label;
const bValue = b.label;
if (aValue !== undefined && bValue !== undefined && aValue === bValue) {
return true;
}
}
return false;
};
private mergeGraphResults = (newResults: GraphSeriesXY[], prevResults: GraphSeriesXY[]): GraphSeriesXY[] => {
if (!prevResults || prevResults.length === 0 || this.replacePreviousResults) {
return newResults; // Hack before we use GraphSeriesXY instead
}
const results: GraphSeriesXY[] = prevResults.slice() as GraphSeriesXY[];
// update existing results
for (let index = 0; index < results.length; index++) {
const prevResult = results[index];
for (const newResult of newResults) {
const isSame = this.isSameGraphSeries(prevResult, newResult);
if (isSame) {
prevResult.data = prevResult.data.concat(newResult.data);
break;
}
}
}
// add new results
for (const newResult of newResults) {
let isNew = true;
for (const prevResult of results) {
const isSame = this.isSameGraphSeries(prevResult, newResult);
if (isSame) {
isNew = false;
break;
}
}
if (isNew) {
results.push(newResult);
}
}
return results;
};
} }
...@@ -181,26 +181,6 @@ export class PrometheusDatasource extends DataSourceApi<PromQuery, PromOptions> ...@@ -181,26 +181,6 @@ export class PrometheusDatasource extends DataSourceApi<PromQuery, PromOptions>
activeTargets: PromQuery[], activeTargets: PromQuery[],
end: number end: number
) => { ) => {
// Because we want to get run instant and TimeSeries Prom queries in parallel but this isn't actually streaming
// we need to stop/cancel each posted event with a stop stream event (see below) to the observer so that the
// PanelQueryState stops the stream
const getStopState = (state: DataStreamState): DataStreamState => ({
...state,
state: LoadingState.Done,
request: { ...options, requestId: 'done' },
});
const startLoadingEvent: DataStreamState = {
key: `prometheus-loading_indicator`,
state: LoadingState.Loading,
request: options,
data: [],
unsubscribe: () => undefined,
};
observer(startLoadingEvent); // Starts the loading indicator
const lastTimeSeriesQuery = queries.filter(query => !query.instant).pop();
for (let index = 0; index < queries.length; index++) { for (let index = 0; index < queries.length; index++) {
const query = queries[index]; const query = queries[index];
const target = activeTargets[index]; const target = activeTargets[index];
...@@ -220,19 +200,15 @@ export class PrometheusDatasource extends DataSourceApi<PromQuery, PromOptions> ...@@ -220,19 +200,15 @@ export class PrometheusDatasource extends DataSourceApi<PromQuery, PromOptions>
const data = this.processResult(response, query, target, queries.length); const data = this.processResult(response, query, target, queries.length);
const state: DataStreamState = { const state: DataStreamState = {
key: `prometheus-${target.refId}`, key: `prometheus-${target.refId}`,
state: LoadingState.Loading, state: LoadingState.Done,
request: options, request: options,
data, // TODO this is obviously wrong as data is not a DataFrame and needs to be dealt with later on
// in PanelQueryState
data: data as any,
unsubscribe: () => undefined, unsubscribe: () => undefined,
}; };
const states = [state, getStopState(state)]; return [state];
if (target.refId === lastTimeSeriesQuery.refId && target.expr === lastTimeSeriesQuery.expr) {
states.push(getStopState(startLoadingEvent)); // Stops the loading indicator
}
return states;
}), }),
catchError(err => { catchError(err => {
const error = this.handleErrors(err, target); const error = this.handleErrors(err, target);
...@@ -306,6 +282,7 @@ export class PrometheusDatasource extends DataSourceApi<PromQuery, PromOptions> ...@@ -306,6 +282,7 @@ export class PrometheusDatasource extends DataSourceApi<PromQuery, PromOptions>
this.runObserverQueries(options, observer, queries, activeTargets, end); this.runObserverQueries(options, observer, queries, activeTargets, end);
return this.$q.when({ data: [] }) as Promise<{ data: any }>; return this.$q.when({ data: [] }) as Promise<{ data: any }>;
} }
const allQueryPromise = _.map(queries, query => { const allQueryPromise = _.map(queries, query => {
if (query.instant) { if (query.instant) {
return this.performInstantQuery(query, end); return this.performInstantQuery(query, end);
......
...@@ -6,7 +6,7 @@ import { TemplateSrv } from 'app/features/templating/template_srv'; ...@@ -6,7 +6,7 @@ import { TemplateSrv } from 'app/features/templating/template_srv';
export class ResultTransformer { export class ResultTransformer {
constructor(private templateSrv: TemplateSrv) {} constructor(private templateSrv: TemplateSrv) {}
transform(response: any, options: any): any[] { transform(response: any, options: any): Array<TableModel | TimeSeries> {
const prometheusResult = response.data.data.result; const prometheusResult = response.data.data.result;
if (options.format === 'table') { if (options.format === 'table') {
...@@ -80,7 +80,7 @@ export class ResultTransformer { ...@@ -80,7 +80,7 @@ export class ResultTransformer {
}; };
} }
transformMetricDataToTable(md: any, resultCount: number, refId: string, valueWithRefId?: boolean) { transformMetricDataToTable(md: any, resultCount: number, refId: string, valueWithRefId?: boolean): TableModel {
const table = new TableModel(); const table = new TableModel();
let i: number, j: number; let i: number, j: number;
const metricLabels: { [key: string]: number } = {}; const metricLabels: { [key: string]: number } = {};
......
...@@ -35,7 +35,7 @@ export class GraphPanelController extends React.Component<GraphPanelControllerPr ...@@ -35,7 +35,7 @@ export class GraphPanelController extends React.Component<GraphPanelControllerPr
this.state = { this.state = {
graphSeriesModel: getGraphSeriesModel( graphSeriesModel: getGraphSeriesModel(
props.data, props.data.series,
props.options.series, props.options.series,
props.options.graph, props.options.graph,
props.options.legend props.options.legend
...@@ -47,7 +47,7 @@ export class GraphPanelController extends React.Component<GraphPanelControllerPr ...@@ -47,7 +47,7 @@ export class GraphPanelController extends React.Component<GraphPanelControllerPr
return { return {
...state, ...state,
graphSeriesModel: getGraphSeriesModel( graphSeriesModel: getGraphSeriesModel(
props.data, props.data.series,
props.options.series, props.options.series,
props.options.graph, props.options.graph,
props.options.legend props.options.legend
......
import { colors, getFlotPairs, getColorFromHexRgbOrName, getDisplayProcessor, PanelData } from '@grafana/ui'; import { colors, getFlotPairs, getColorFromHexRgbOrName, getDisplayProcessor } from '@grafana/ui';
import { NullValueMode, reduceField, FieldType, DisplayValue, GraphSeriesXY, getTimeField } from '@grafana/data'; import {
NullValueMode,
reduceField,
FieldType,
DisplayValue,
GraphSeriesXY,
getTimeField,
DataFrame,
} from '@grafana/data';
import { SeriesOptions, GraphOptions } from './types'; import { SeriesOptions, GraphOptions } from './types';
import { GraphLegendEditorLegendOptions } from './GraphLegendEditor'; import { GraphLegendEditorLegendOptions } from './GraphLegendEditor';
export const getGraphSeriesModel = ( export const getGraphSeriesModel = (
data: PanelData, dataFrames: DataFrame[],
seriesOptions: SeriesOptions, seriesOptions: SeriesOptions,
graphOptions: GraphOptions, graphOptions: GraphOptions,
legendOptions: GraphLegendEditorLegendOptions legendOptions: GraphLegendEditorLegendOptions
...@@ -18,7 +26,7 @@ export const getGraphSeriesModel = ( ...@@ -18,7 +26,7 @@ export const getGraphSeriesModel = (
}, },
}); });
for (const series of data.series) { for (const series of dataFrames) {
const { timeField } = getTimeField(series); const { timeField } = getTimeField(series);
if (!timeField) { if (!timeField) {
continue; continue;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment