Commit 68944f82 by kay delaney Committed by GitHub

Datasource/Loki: Fixes regression where enhanceDataFrame was not called (#20660)

* Datasource/Loki: Fixes regression where enhanceDataFrame was not called
Closes #20642
parent d571e29a
// Libraries
import { isEmpty, map as lodashMap, fromPairs } from 'lodash';
import { isEmpty, map as lodashMap } from 'lodash';
import { Observable, from, merge, of, iif, defer } from 'rxjs';
import { map, filter, catchError, switchMap, mergeMap } from 'rxjs/operators';
......@@ -14,7 +14,7 @@ import {
processRangeQueryResponse,
legacyLogStreamToDataFrame,
lokiStreamResultToDataFrame,
isLokiLogsStream,
lokiLegacyStreamsToDataframes,
} from './result_transformer';
import { formatQuery, parseQuery, getHighlighterExpressionsFromQuery } from './query_utils';
......@@ -26,10 +26,6 @@ import {
AnnotationEvent,
DataFrameView,
TimeRange,
FieldConfig,
ArrayVector,
FieldType,
DataFrame,
TimeSeries,
PluginMeta,
DataSourceApi,
......@@ -49,7 +45,6 @@ import {
LokiResultType,
LokiRangeQueryRequest,
LokiStreamResponse,
LokiLegacyStreamResult,
} from './types';
import { ExploreMode } from 'app/types';
import { LegacyTarget, LiveStreams } from './live_streams';
......@@ -195,43 +190,18 @@ export class LokiDatasource extends DataSourceApi<LokiQuery, LokiOptions> {
catchError((err: any) => this.throwUnless(err, err.cancelled, target)),
filter((response: any) => !response.cancelled),
map((response: { data: LokiLegacyStreamResponse }) => ({
data: this.lokiLegacyStreamsToDataframes(response.data, query, this.maxLines, options.reverse),
data: lokiLegacyStreamsToDataframes(
response.data,
query,
this.maxLines,
this.instanceSettings.jsonData,
options.reverse
),
key: `${target.refId}_log`,
}))
);
};
lokiLegacyStreamsToDataframes = (
data: LokiLegacyStreamResult | LokiLegacyStreamResponse,
target: { refId: string; query?: string; regexp?: string },
limit: number,
reverse = false
): DataFrame[] => {
if (Object.keys(data).length === 0) {
return [];
}
if (isLokiLogsStream(data)) {
return [legacyLogStreamToDataFrame(data, false, target.refId)];
}
const series: DataFrame[] = data.streams.map(stream => {
const dataFrame = legacyLogStreamToDataFrame(stream, reverse);
this.enhanceDataFrame(dataFrame);
return {
...dataFrame,
refId: target.refId,
meta: {
searchWords: getHighlighterExpressionsFromQuery(formatQuery(target.query, target.regexp)),
limit: this.maxLines,
},
};
});
return series;
};
runInstantQuery = (
target: LokiQuery,
options: DataQueryRequest<LokiQuery>,
......@@ -309,7 +279,15 @@ export class LokiDatasource extends DataSourceApi<LokiQuery, LokiOptions> {
() => response.status === 404,
defer(() => this.runLegacyQuery(target, options)),
defer(() =>
processRangeQueryResponse(response.data, target, query, responseListLength, this.maxLines, options.reverse)
processRangeQueryResponse(
response.data,
target,
query,
responseListLength,
this.maxLines,
this.instanceSettings.jsonData,
options.reverse
)
)
)
)
......@@ -607,51 +585,6 @@ export class LokiDatasource extends DataSourceApi<LokiQuery, LokiOptions> {
return annotations;
}
/**
* Adds new fields and DataLinks to DataFrame based on DataSource instance config.
* @param dataFrame
*/
enhanceDataFrame(dataFrame: DataFrame): void {
if (!this.instanceSettings.jsonData) {
return;
}
const derivedFields = this.instanceSettings.jsonData.derivedFields || [];
if (derivedFields.length) {
const fields = fromPairs(
derivedFields.map(field => {
const config: FieldConfig = {};
if (field.url) {
config.links = [
{
url: field.url,
title: '',
},
];
}
const dataFrameField = {
name: field.name,
type: FieldType.string,
config,
values: new ArrayVector<string>([]),
};
return [field.name, dataFrameField];
})
);
const view = new DataFrameView(dataFrame);
view.forEachRow((row: { line: string }) => {
for (const field of derivedFields) {
const logMatch = row.line.match(field.matcherRegex);
fields[field.name].values.add(logMatch && logMatch[1]);
}
});
dataFrame.fields = [...dataFrame.fields, ...Object.values(fields)];
}
}
throwUnless = (err: any, condition: boolean, target: LokiQuery) => {
if (condition) {
return of(err);
......
import { legacyLogStreamToDataFrame, appendLegacyResponseToBufferedData } from './result_transformer';
import { FieldType, MutableDataFrame } from '@grafana/data';
import { LokiLegacyStreamResult } from './types';
import { LokiLegacyStreamResult, LokiStreamResult } from './types';
import * as ResultTransformer from './result_transformer';
const streams: LokiLegacyStreamResult[] = [
const legacyStreamResult: LokiLegacyStreamResult[] = [
{
labels: '{foo="bar"}',
entries: [
......@@ -23,35 +23,124 @@ const streams: LokiLegacyStreamResult[] = [
},
];
describe('logStreamToDataFrame', () => {
it('converts streams to series', () => {
const data = streams.map(stream => legacyLogStreamToDataFrame(stream));
expect(data.length).toBe(2);
expect(data[0].fields[1].labels['foo']).toEqual('bar');
expect(data[0].fields[0].values.get(0)).toEqual(streams[0].entries[0].ts);
expect(data[0].fields[1].values.get(0)).toEqual(streams[0].entries[0].line);
expect(data[0].fields[2].values.get(0)).toEqual('1970-01-01T00:00:00Z_{foo="bar"}');
expect(data[1].fields[0].values.get(0)).toEqual(streams[1].entries[0].ts);
expect(data[1].fields[1].values.get(0)).toEqual(streams[1].entries[0].line);
expect(data[1].fields[2].values.get(0)).toEqual('1970-01-01T00:00:00Z_{bar="foo"}');
const streamResult: LokiStreamResult[] = [
{
stream: {
foo: 'bar',
},
values: [['1970-01-01T00:00:00Z', "foo: 'bar'"]],
},
{
stream: {
bar: 'foo',
},
values: [['1970-01-01T00:00:00Z', "bar: 'foo'"]],
},
];
describe('loki result transformer', () => {
afterAll(() => {
jest.restoreAllMocks();
});
});
describe('appendResponseToBufferedData', () => {
it('appends response', () => {
const data = new MutableDataFrame();
data.addField({ name: 'ts', type: FieldType.time, config: { title: 'Time' } });
data.addField({ name: 'line', type: FieldType.string });
data.addField({ name: 'labels', type: FieldType.other });
data.addField({ name: 'id', type: FieldType.string });
appendLegacyResponseToBufferedData({ streams }, data);
expect(data.get(0)).toEqual({
ts: '1970-01-01T00:00:00Z',
line: "foo: 'bar'",
labels: { foo: 'bar' },
id: '1970-01-01T00:00:00Z_{foo="bar"}',
afterEach(() => {
jest.clearAllMocks();
});
describe('legacyLogStreamToDataFrame', () => {
it('converts streams to series', () => {
const data = legacyStreamResult.map(stream => ResultTransformer.legacyLogStreamToDataFrame(stream));
expect(data.length).toBe(2);
expect(data[0].fields[1].labels['foo']).toEqual('bar');
expect(data[0].fields[0].values.get(0)).toEqual(legacyStreamResult[0].entries[0].ts);
expect(data[0].fields[1].values.get(0)).toEqual(legacyStreamResult[0].entries[0].line);
expect(data[0].fields[2].values.get(0)).toEqual('1970-01-01T00:00:00Z_{foo="bar"}');
expect(data[1].fields[0].values.get(0)).toEqual(legacyStreamResult[1].entries[0].ts);
expect(data[1].fields[1].values.get(0)).toEqual(legacyStreamResult[1].entries[0].line);
expect(data[1].fields[2].values.get(0)).toEqual('1970-01-01T00:00:00Z_{bar="foo"}');
});
});
describe('lokiLegacyStreamsToDataframes', () => {
it('should enhance data frames', () => {
jest.spyOn(ResultTransformer, 'enhanceDataFrame');
const dataFrames = ResultTransformer.lokiLegacyStreamsToDataframes(
{ streams: legacyStreamResult },
{ refId: 'A' },
500,
{
derivedFields: [
{
matcherRegex: 'tracer=(w+)',
name: 'test',
url: 'example.com',
},
],
}
);
expect(ResultTransformer.enhanceDataFrame).toBeCalled();
dataFrames.forEach(frame => {
expect(
frame.fields.filter(field => field.name === 'test' && field.type === 'string').length
).toBeGreaterThanOrEqual(1);
});
});
});
describe('lokiStreamResultToDataFrame', () => {
it('converts streams to series', () => {
const data = streamResult.map(stream => ResultTransformer.lokiStreamResultToDataFrame(stream));
expect(data.length).toBe(2);
expect(data[0].fields[1].labels['foo']).toEqual('bar');
expect(data[0].fields[0].values.get(0)).toEqual(legacyStreamResult[0].entries[0].ts);
expect(data[0].fields[1].values.get(0)).toEqual(legacyStreamResult[0].entries[0].line);
expect(data[0].fields[2].values.get(0)).toEqual('1970-01-01T00:00:00Z_{foo="bar"}');
expect(data[1].fields[0].values.get(0)).toEqual(legacyStreamResult[1].entries[0].ts);
expect(data[1].fields[1].values.get(0)).toEqual(legacyStreamResult[1].entries[0].line);
expect(data[1].fields[2].values.get(0)).toEqual('1970-01-01T00:00:00Z_{bar="foo"}');
});
});
describe('lokiStreamsToDataframes', () => {
it('should enhance data frames', () => {
jest.spyOn(ResultTransformer, 'enhanceDataFrame');
const dataFrames = ResultTransformer.lokiStreamsToDataframes(streamResult, { refId: 'B' }, 500, {
derivedFields: [
{
matcherRegex: 'trace=(w+)',
name: 'test',
url: 'example.com',
},
],
});
expect(ResultTransformer.enhanceDataFrame).toBeCalled();
dataFrames.forEach(frame => {
expect(
frame.fields.filter(field => field.name === 'test' && field.type === 'string').length
).toBeGreaterThanOrEqual(1);
});
});
});
describe('appendResponseToBufferedData', () => {
it('appends response', () => {
const data = new MutableDataFrame();
data.addField({ name: 'ts', type: FieldType.time, config: { title: 'Time' } });
data.addField({ name: 'line', type: FieldType.string });
data.addField({ name: 'labels', type: FieldType.other });
data.addField({ name: 'id', type: FieldType.string });
ResultTransformer.appendLegacyResponseToBufferedData({ streams: legacyStreamResult }, data);
expect(data.get(0)).toEqual({
ts: '1970-01-01T00:00:00Z',
line: "foo: 'bar'",
labels: { foo: 'bar' },
id: '1970-01-01T00:00:00Z_{foo="bar"}',
});
});
});
});
......@@ -10,6 +10,8 @@ import {
MutableDataFrame,
findUniqueLabels,
dateTime,
FieldConfig,
DataFrameView,
} from '@grafana/data';
import templateSrv from 'app/features/templating/template_srv';
import TableModel from 'app/core/table_model';
......@@ -25,6 +27,7 @@ import {
LokiStreamResult,
LokiTailResponse,
LokiQuery,
LokiOptions,
} from './types';
import { formatQuery, getHighlighterExpressionsFromQuery } from './query_utils';
......@@ -77,12 +80,16 @@ export function lokiStreamResultToDataFrame(stream: LokiStreamResult, reverse?:
const uids = new ArrayVector<string>([]);
for (const [ts, line] of stream.values) {
times.add(dateTime(Number.parseFloat(ts) / 1e6).format('YYYY-MM-DD HH:mm:ss'));
times.add(
dateTime(Number.parseFloat(ts) / 1e6)
.utc()
.format()
);
lines.add(line);
uids.add(
`${ts}_${Object.entries(labels)
.map(([key, val]) => `${key}=${val}`)
.join('')}`
`${ts}_{${Object.entries(labels)
.map(([key, val]) => `${key}="${val}"`)
.join('')}}`
);
}
......@@ -301,16 +308,21 @@ export function lokiStreamsToDataframes(
data: LokiStreamResult[],
target: { refId: string; expr?: string; regexp?: string },
limit: number,
config: LokiOptions,
reverse = false
): DataFrame[] {
const series: DataFrame[] = data.map(stream => ({
...lokiStreamResultToDataFrame(stream, reverse),
refId: target.refId,
meta: {
searchWords: getHighlighterExpressionsFromQuery(formatQuery(target.expr, target.regexp)),
limit,
},
}));
const series: DataFrame[] = data.map(stream => {
const dataFrame = lokiStreamResultToDataFrame(stream, reverse);
enhanceDataFrame(dataFrame, config);
return {
...dataFrame,
refId: target.refId,
meta: {
searchWords: getHighlighterExpressionsFromQuery(formatQuery(target.expr, target.regexp)),
limit,
},
};
});
return series;
}
......@@ -319,6 +331,7 @@ export function lokiLegacyStreamsToDataframes(
data: LokiLegacyStreamResult | LokiLegacyStreamResponse,
target: { refId: string; query?: string; regexp?: string },
limit: number,
config: LokiOptions,
reverse = false
): DataFrame[] {
if (Object.keys(data).length === 0) {
......@@ -326,21 +339,72 @@ export function lokiLegacyStreamsToDataframes(
}
if (isLokiLogsStream(data)) {
return [legacyLogStreamToDataFrame(data, reverse, target.refId)];
return [legacyLogStreamToDataFrame(data, false, target.refId)];
}
const series: DataFrame[] = data.streams.map(stream => ({
...legacyLogStreamToDataFrame(stream, reverse),
refId: target.refId,
meta: {
searchWords: getHighlighterExpressionsFromQuery(formatQuery(target.query, target.regexp)),
limit,
},
}));
const series: DataFrame[] = data.streams.map(stream => {
const dataFrame = legacyLogStreamToDataFrame(stream, reverse);
enhanceDataFrame(dataFrame, config);
return {
...dataFrame,
refId: target.refId,
meta: {
searchWords: getHighlighterExpressionsFromQuery(formatQuery(target.query, target.regexp)),
limit,
},
};
});
return series;
}
/**
* Adds new fields and DataLinks to DataFrame based on DataSource instance config.
* @param dataFrame
*/
export const enhanceDataFrame = (dataFrame: DataFrame, config: LokiOptions | null): void => {
if (!config) {
return;
}
const derivedFields = config.derivedFields ?? [];
if (!derivedFields.length) {
return;
}
const fields = derivedFields.reduce((acc, field) => {
const config: FieldConfig = {};
if (field.url) {
config.links = [
{
url: field.url,
title: '',
},
];
}
const dataFrameField = {
name: field.name,
type: FieldType.string,
config,
values: new ArrayVector<string>([]),
};
acc[field.name] = dataFrameField;
return acc;
}, {} as Record<string, any>);
const view = new DataFrameView(dataFrame);
view.forEachRow((row: { line: string }) => {
for (const field of derivedFields) {
const logMatch = row.line.match(field.matcherRegex);
fields[field.name].values.add(logMatch && logMatch[1]);
}
});
dataFrame.fields = [...dataFrame.fields, ...Object.values(fields)];
};
export function rangeQueryResponseToTimeSeries(
response: LokiResponse,
query: LokiRangeQueryRequest,
......@@ -377,12 +441,13 @@ export function processRangeQueryResponse(
query: LokiRangeQueryRequest,
responseListLength: number,
limit: number,
config: LokiOptions,
reverse = false
) {
switch (response.data.resultType) {
case LokiResultType.Stream:
return of({
data: lokiStreamsToDataframes(response.data.result, target, limit, reverse),
data: lokiStreamsToDataframes(response.data.result, target, limit, config, reverse),
key: `${target.refId}_log`,
});
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment