Commit 6b8d7c89 by David Kaltschmidt

Refactored log stream merging, added types, tests, comments

parent dfc0c505
......@@ -31,15 +31,16 @@ export interface LogSearchMatch {
}
export interface LogRow {
key: string;
entry: string;
key: string; // timestamp + labels
labels: string;
logLevel: LogLevel;
timestamp: string;
searchWords?: string[];
timestamp: string; // ISO with nanosec precision
timeFromNow: string;
timeJs: number;
timeEpochMs: number;
timeLocal: string;
searchWords?: string[];
uniqueLabels?: string;
}
export interface LogsMetaItem {
......@@ -56,11 +57,46 @@ export interface LogsModel {
export interface LogsStream {
labels: string;
entries: LogsStreamEntry[];
parsedLabels: { [key: string]: string };
intervalMs?: number;
search?: string;
parsedLabels?: LogsStreamLabels;
uniqueLabels?: string;
}
export interface LogsStreamEntry {
line: string;
timestamp: string;
}
export interface LogsStreamLabels {
[key: string]: string;
}
export function makeSeriesForLogs(rows: LogRow[], intervalMs: number): TimeSeries[] {
// Graph time series by log level
const seriesByLevel = {};
rows.forEach(row => {
if (!seriesByLevel[row.logLevel]) {
seriesByLevel[row.logLevel] = { lastTs: null, datapoints: [], alias: row.logLevel };
}
const levelSeries = seriesByLevel[row.logLevel];
// Bucket to nearest minute
const time = Math.round(row.timeEpochMs / intervalMs / 10) * intervalMs * 10;
// Entry for time
if (time === levelSeries.lastTs) {
levelSeries.datapoints[levelSeries.datapoints.length - 1][0]++;
} else {
levelSeries.datapoints.push([1, time]);
levelSeries.lastTs = time;
}
});
return Object.keys(seriesByLevel).reduce((acc, level) => {
if (seriesByLevel[level]) {
const gs = new TimeSeries(seriesByLevel[level]);
gs.setColor(LogLevelColor[level]);
acc.push(gs);
}
return acc;
}, []);
}
......@@ -28,6 +28,17 @@ import { DataSource } from 'app/types/datasources';
const MAX_HISTORY_ITEMS = 100;
function getIntervals(range: RawTimeRange, datasource, resolution: number): { interval: string; intervalMs: number } {
if (!datasource || !resolution) {
return { interval: '1s', intervalMs: 1000 };
}
const absoluteRange: RawTimeRange = {
from: parseDate(range.from, false),
to: parseDate(range.to, true),
};
return kbn.calculateInterval(absoluteRange, resolution, datasource.interval);
}
function makeTimeSeriesList(dataList, options) {
return dataList.map((seriesData, index) => {
const datapoints = seriesData.datapoints || [];
......@@ -470,12 +481,7 @@ export class Explore extends React.PureComponent<ExploreProps, ExploreState> {
targetOptions: { format: string; hinting?: boolean; instant?: boolean }
) {
const { datasource, range } = this.state;
const resolution = this.el.offsetWidth;
const absoluteRange: RawTimeRange = {
from: parseDate(range.from, false),
to: parseDate(range.to, true),
};
const { interval, intervalMs } = kbn.calculateInterval(absoluteRange, resolution, datasource.interval);
const { interval, intervalMs } = getIntervals(range, datasource, this.el.offsetWidth);
const targets = [
{
...targetOptions,
......@@ -759,6 +765,7 @@ export class Explore extends React.PureComponent<ExploreProps, ExploreState> {
const tableButtonActive = showingBoth || showingTable ? 'active' : '';
const exploreClass = split ? 'explore explore-split' : 'explore';
const selectedDatasource = datasource ? exploreDatasources.find(d => d.label === datasource.name) : undefined;
const graphRangeIntervals = getIntervals(graphRange, datasource, this.el ? this.el.offsetWidth : 0);
const graphLoading = queryTransactions.some(qt => qt.resultType === 'Graph' && !qt.done);
const tableLoading = queryTransactions.some(qt => qt.resultType === 'Table' && !qt.done);
const logsLoading = queryTransactions.some(qt => qt.resultType === 'Logs' && !qt.done);
......@@ -775,7 +782,8 @@ export class Explore extends React.PureComponent<ExploreProps, ExploreState> {
? datasource.mergeStreams(
_.flatten(
queryTransactions.filter(qt => qt.resultType === 'Logs' && qt.done && qt.result).map(qt => qt.result)
)
),
graphRangeIntervals.intervalMs
)
: undefined;
const loading = queryTransactions.some(qt => !qt.done);
......
......@@ -3,10 +3,10 @@ import _ from 'lodash';
import * as dateMath from 'app/core/utils/datemath';
import LanguageProvider from './language_provider';
import { mergeStreams, processStream } from './result_transformer';
import { LogsStream } from 'app/core/logs_model';
import { mergeStreamsToLogs } from './result_transformer';
import { LogsStream, LogsModel, makeSeriesForLogs } from 'app/core/logs_model';
const DEFAULT_LIMIT = 1000;
export const DEFAULT_LIMIT = 1000;
const DEFAULT_QUERY_PARAMS = {
direction: 'BACKWARD',
......@@ -68,8 +68,10 @@ export default class LoggingDatasource {
return this.backendSrv.datasourceRequest(req);
}
mergeStreams(streams: LogsStream[]) {
return mergeStreams(streams, DEFAULT_LIMIT);
mergeStreams(streams: LogsStream[], intervalMs: number): LogsModel {
const logs = mergeStreamsToLogs(streams);
logs.series = makeSeriesForLogs(logs.rows, intervalMs);
return logs;
}
prepareQueryTarget(target, options) {
......@@ -84,7 +86,7 @@ export default class LoggingDatasource {
};
}
query(options) {
query(options): Promise<{ data: LogsStream[] }> {
const queryTargets = options.targets
.filter(target => target.expr)
.map(target => this.prepareQueryTarget(target, options));
......@@ -96,17 +98,16 @@ export default class LoggingDatasource {
return Promise.all(queries).then((results: any[]) => {
// Flatten streams from multiple queries
const allStreams = results.reduce((acc, response, i) => {
const streams = response.data.streams || [];
const allStreams: LogsStream[] = results.reduce((acc, response, i) => {
const streams: LogsStream[] = response.data.streams || [];
// Inject search for match highlighting
const search = queryTargets[i].regexp;
const search: string = queryTargets[i].regexp;
streams.forEach(s => {
s.search = search;
});
return [...acc, ...streams];
}, []);
const processedStreams = allStreams.map(stream => processStream(stream, DEFAULT_LIMIT, options.intervalMs));
return { data: processedStreams };
return { data: allStreams };
});
}
......
import { LogLevel } from 'app/core/logs_model';
import { LogLevel, LogsStream } from 'app/core/logs_model';
import { findCommonLabels, findUncommonLabels, formatLabels, getLogLevel, parseLabels } from './result_transformer';
import {
findCommonLabels,
findUniqueLabels,
formatLabels,
getLogLevel,
mergeStreamsToLogs,
parseLabels,
} from './result_transformer';
describe('getLoglevel()', () => {
it('returns no log level on empty line', () => {
......@@ -61,16 +68,88 @@ describe('findCommonLabels()', () => {
});
});
describe('findUncommonLabels()', () => {
describe('findUniqueLabels()', () => {
it('returns no uncommon labels on empty sets', () => {
expect(findUncommonLabels({}, {})).toEqual({});
expect(findUniqueLabels({}, {})).toEqual({});
});
it('returns all labels given no common labels', () => {
expect(findUncommonLabels({ foo: '"bar"' }, {})).toEqual({ foo: '"bar"' });
expect(findUniqueLabels({ foo: '"bar"' }, {})).toEqual({ foo: '"bar"' });
});
it('returns all labels except the common labels', () => {
expect(findUncommonLabels({ foo: '"bar"', baz: '"42"' }, { foo: '"bar"' })).toEqual({ baz: '"42"' });
expect(findUniqueLabels({ foo: '"bar"', baz: '"42"' }, { foo: '"bar"' })).toEqual({ baz: '"42"' });
});
});
describe('mergeStreamsToLogs()', () => {
it('returns empty logs given no streams', () => {
expect(mergeStreamsToLogs([]).rows).toEqual([]);
});
it('returns processed logs from single stream', () => {
const stream1: LogsStream = {
labels: '{foo="bar"}',
entries: [
{
line: 'WARN boooo',
timestamp: '1970-01-01T00:00:00Z',
},
],
};
expect(mergeStreamsToLogs([stream1]).rows).toMatchObject([
{
entry: 'WARN boooo',
labels: '{foo="bar"}',
key: 'EK1970-01-01T00:00:00Z{foo="bar"}',
logLevel: 'warn',
uniqueLabels: '',
},
]);
});
it('returns merged logs from multiple streams sorted by time and with unique labels', () => {
const stream1: LogsStream = {
labels: '{foo="bar", baz="1"}',
entries: [
{
line: 'WARN boooo',
timestamp: '1970-01-01T00:00:01Z',
},
],
};
const stream2: LogsStream = {
labels: '{foo="bar", baz="2"}',
entries: [
{
line: 'INFO 1',
timestamp: '1970-01-01T00:00:00Z',
},
{
line: 'INFO 2',
timestamp: '1970-01-01T00:00:02Z',
},
],
};
expect(mergeStreamsToLogs([stream1, stream2]).rows).toMatchObject([
{
entry: 'INFO 2',
labels: '{foo="bar", baz="2"}',
logLevel: 'info',
uniqueLabels: '{baz="2"}',
},
{
entry: 'WARN boooo',
labels: '{foo="bar", baz="1"}',
logLevel: 'warn',
uniqueLabels: '{baz="1"}',
},
{
entry: 'INFO 1',
labels: '{foo="bar", baz="2"}',
logLevel: 'info',
uniqueLabels: '{baz="2"}',
},
]);
});
});
import _ from 'lodash';
import moment from 'moment';
import { LogLevel, LogLevelColor, LogsMetaItem, LogsModel, LogRow, LogsStream } from 'app/core/logs_model';
import { TimeSeries } from 'app/core/core';
import {
LogLevel,
LogsMetaItem,
LogsModel,
LogRow,
LogsStream,
LogsStreamEntry,
LogsStreamLabels,
} from 'app/core/logs_model';
import { DEFAULT_LIMIT } from './datasource';
/**
* Returns the log level of a log line.
* Parse the line for level words. If no level is found, it returns `LogLevel.none`.
*
* Example: `getLogLevel('WARN 1999-12-31 this is great') // LogLevel.warn`
*/
export function getLogLevel(line: string): LogLevel {
if (!line) {
return LogLevel.none;
......@@ -23,9 +37,18 @@ export function getLogLevel(line: string): LogLevel {
return level;
}
/**
* Regexp to extract Prometheus-style labels
*/
const labelRegexp = /\b(\w+)(!?=~?)("[^"\n]*?")/g;
export function parseLabels(labels: string): { [key: string]: string } {
const labelsByKey = {};
/**
* Returns a map of label keys to value from an input selector string.
*
* Example: `parseLabels('{job="foo", instance="bar"}) // {job: "foo", instance: "bar"}`
*/
export function parseLabels(labels: string): LogsStreamLabels {
const labelsByKey: LogsStreamLabels = {};
labels.replace(labelRegexp, (_, key, operator, value) => {
labelsByKey[key] = value;
return '';
......@@ -33,7 +56,10 @@ export function parseLabels(labels: string): { [key: string]: string } {
return labelsByKey;
}
export function findCommonLabels(labelsSets: any[]) {
/**
* Returns a map labels that are common to the given label sets.
*/
export function findCommonLabels(labelsSets: LogsStreamLabels[]): LogsStreamLabels {
return labelsSets.reduce((acc, labels) => {
if (!labels) {
throw new Error('Need parsed labels to find common labels.');
......@@ -59,15 +85,21 @@ export function findCommonLabels(labelsSets: any[]) {
}, undefined);
}
export function findUncommonLabels(labels, commonLabels) {
const uncommonLabels = { ...labels };
/**
* Returns a map of labels that are in `labels`, but not in `commonLabels`.
*/
export function findUniqueLabels(labels: LogsStreamLabels, commonLabels: LogsStreamLabels): LogsStreamLabels {
const uncommonLabels: LogsStreamLabels = { ...labels };
Object.keys(commonLabels).forEach(key => {
delete uncommonLabels[key];
});
return uncommonLabels;
}
export function formatLabels(labels, defaultValue = '') {
/**
* Serializes the given labels to a string.
*/
export function formatLabels(labels: LogsStreamLabels, defaultValue = ''): string {
if (!labels || Object.keys(labels).length === 0) {
return defaultValue;
}
......@@ -76,111 +108,72 @@ export function formatLabels(labels, defaultValue = '') {
return ['{', cleanSelector, '}'].join('');
}
export function processEntry(entry: { line: string; timestamp: string }, stream): LogRow {
export function processEntry(entry: LogsStreamEntry, labels: string, uniqueLabels: string, search: string): LogRow {
const { line, timestamp } = entry;
const { labels } = stream;
// Assumes unique-ness, needs nanosec precision for timestamp
const key = `EK${timestamp}${labels}`;
const time = moment(timestamp);
const timeJs = time.valueOf();
const timeEpochMs = time.valueOf();
const timeFromNow = time.fromNow();
const timeLocal = time.format('YYYY-MM-DD HH:mm:ss');
const logLevel = getLogLevel(line);
return {
key,
labels,
logLevel,
timeFromNow,
timeJs,
timeEpochMs,
timeLocal,
uniqueLabels,
entry: line,
labels: formatLabels(labels),
searchWords: [stream.search],
searchWords: search ? [search] : [],
timestamp: timestamp,
};
}
export function mergeStreams(streams: LogsStream[], limit?: number): LogsModel {
// Find meta data
const commonLabels = findCommonLabels(streams.map(stream => stream.parsedLabels));
const meta: LogsMetaItem[] = [
{
label: 'Common labels',
value: formatLabels(commonLabels),
},
];
let intervalMs;
// Flatten entries of streams
const combinedEntries: LogRow[] = streams.reduce((acc, stream) => {
// Set interval for graphs
intervalMs = stream.intervalMs;
// Overwrite labels to be only the non-common ones
const labels = formatLabels(findUncommonLabels(stream.parsedLabels, commonLabels));
return [
...acc,
...stream.entries.map(entry => ({
...entry,
labels,
})),
];
}, []);
// Graph time series by log level
const seriesByLevel = {};
combinedEntries.forEach(entry => {
if (!seriesByLevel[entry.logLevel]) {
seriesByLevel[entry.logLevel] = { lastTs: null, datapoints: [], alias: entry.logLevel };
}
const levelSeries = seriesByLevel[entry.logLevel];
// Bucket to nearest minute
const time = Math.round(entry.timeJs / intervalMs / 10) * intervalMs * 10;
// Entry for time
if (time === levelSeries.lastTs) {
levelSeries.datapoints[levelSeries.datapoints.length - 1][0]++;
} else {
levelSeries.datapoints.push([1, time]);
levelSeries.lastTs = time;
}
});
const series = Object.keys(seriesByLevel).reduce((acc, level, index) => {
if (seriesByLevel[level]) {
const gs = new TimeSeries(seriesByLevel[level]);
gs.setColor(LogLevelColor[level]);
acc.push(gs);
}
return acc;
}, []);
const sortedEntries = _.chain(combinedEntries)
export function mergeStreamsToLogs(streams: LogsStream[], limit = DEFAULT_LIMIT): LogsModel {
// Find unique labels for each stream
streams = streams.map(stream => ({
...stream,
parsedLabels: parseLabels(stream.labels),
}));
const commonLabels = findCommonLabels(streams.map(model => model.parsedLabels));
streams = streams.map(stream => ({
...stream,
uniqueLabels: formatLabels(findUniqueLabels(stream.parsedLabels, commonLabels)),
}));
// Merge stream entries into single list of log rows
const sortedRows: LogRow[] = _.chain(streams)
.reduce(
(acc: LogRow[], stream: LogsStream) => [
...acc,
...stream.entries.map(entry => processEntry(entry, stream.labels, stream.uniqueLabels, stream.search)),
],
[]
)
.sortBy('timestamp')
.reverse()
.slice(0, limit || combinedEntries.length)
.value();
meta.push({
label: 'Limit',
value: `${limit} (${sortedEntries.length} returned)`,
});
return { meta, series, rows: sortedEntries };
}
export function processStream(stream: LogsStream, limit?: number, intervalMs?: number): LogsStream {
const sortedEntries: any[] = _.chain(stream.entries)
.map(entry => processEntry(entry, stream))
.sortBy('timestamp')
.reverse()
.slice(0, limit || stream.entries.length)
.value();
// Meta data to display in status
const meta: LogsMetaItem[] = [];
if (_.size(commonLabels) > 0) {
meta.push({
label: 'Common labels',
value: formatLabels(commonLabels),
});
}
if (limit) {
meta.push({
label: 'Limit',
value: `${limit} (${sortedRows.length} returned)`,
});
}
return {
...stream,
intervalMs,
entries: sortedEntries,
parsedLabels: parseLabels(stream.labels),
meta,
rows: sortedRows,
};
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment