Commit 934d93ad by Andrej Ocenas Committed by GitHub

Elastic: Map level field based on config. (#22182)

* Map level field based on config.

* Fix type
parent 10fbabfb
...@@ -312,7 +312,7 @@ export function logSeriesToLogsModel(logSeries: DataFrame[]): LogsModel | undefi ...@@ -312,7 +312,7 @@ export function logSeriesToLogsModel(logSeries: DataFrame[]): LogsModel | undefi
const searchWords = series.meta && series.meta.searchWords ? series.meta.searchWords : []; const searchWords = series.meta && series.meta.searchWords ? series.meta.searchWords : [];
let logLevel = LogLevel.unknown; let logLevel = LogLevel.unknown;
if (logLevelField) { if (logLevelField && logLevelField.values.get(j)) {
logLevel = getLogLevelFromKey(logLevelField.values.get(j)); logLevel = getLogLevelFromKey(logLevelField.values.get(j));
} else if (seriesLogLevel) { } else if (seriesLogLevel) {
logLevel = seriesLogLevel; logLevel = seriesLogLevel;
......
...@@ -416,7 +416,6 @@ export class ElasticResponse { ...@@ -416,7 +416,6 @@ export class ElasticResponse {
getLogs(logMessageField?: string, logLevelField?: string): DataQueryResponse { getLogs(logMessageField?: string, logLevelField?: string): DataQueryResponse {
const dataFrame: DataFrame[] = []; const dataFrame: DataFrame[] = [];
const docs: any[] = [];
for (let n = 0; n < this.response.responses.length; n++) { for (let n = 0; n < this.response.responses.length; n++) {
const response = this.response.responses[n]; const response = this.response.responses[n];
...@@ -424,78 +423,18 @@ export class ElasticResponse { ...@@ -424,78 +423,18 @@ export class ElasticResponse {
throw this.getErrorFromElasticResponse(this.response, response.error); throw this.getErrorFromElasticResponse(this.response, response.error);
} }
// We keep a list of all props so that we can create all the fields in the dataFrame, this can lead const { propNames, docs } = flattenHits(response.hits.hits);
// to wide sparse dataframes in case the scheme is different per document.
let propNames: string[] = [];
for (const hit of response.hits.hits) {
const flattened = hit._source ? flatten(hit._source, null) : {};
const doc = {
_id: hit._id,
_type: hit._type,
_index: hit._index,
_source: { ...flattened },
...flattened,
};
for (const propName of Object.keys(doc)) {
if (propNames.indexOf(propName) === -1) {
propNames.push(propName);
}
}
docs.push(doc);
}
if (docs.length > 0) { if (docs.length > 0) {
propNames = propNames.sort(); const series = createEmptyDataFrame(propNames, this.targets[0].timeField, logMessageField, logLevelField);
const series = new MutableDataFrame({ fields: [] });
series.addField({
name: this.targets[0].timeField,
type: FieldType.time,
});
if (logMessageField) {
series.addField({
name: logMessageField,
type: FieldType.string,
}).parse = (v: any) => {
return v || '';
};
} else {
series.addField({
name: '_source',
type: FieldType.string,
}).parse = (v: any) => {
return JSON.stringify(v, null, 2);
};
}
if (logLevelField) {
series.addField({
name: 'level',
type: FieldType.string,
}).parse = (v: any) => {
return v || '';
};
}
for (const propName of propNames) {
if (propName === this.targets[0].timeField || propName === '_source') {
continue;
}
series.addField({
name: propName,
type: FieldType.string,
}).parse = (v: any) => {
return v || '';
};
}
// Add a row for each document // Add a row for each document
for (const doc of docs) { for (const doc of docs) {
if (logLevelField) {
// Remap level field based on the datasource config. This field is then used in explore to figure out the
// log level. We may rewrite some actual data in the level field if they are different.
doc['level'] = doc[logLevelField];
}
series.add(doc); series.add(doc);
} }
...@@ -522,3 +461,110 @@ export class ElasticResponse { ...@@ -522,3 +461,110 @@ export class ElasticResponse {
return { data: dataFrame }; return { data: dataFrame };
} }
} }
type Doc = {
_id: string;
_type: string;
_index: string;
_source?: any;
};
/**
* Flatten the docs from response mainly the _source part which can be nested. This flattens it so that it is one level
* deep and the keys are: `level1Name.level2Name...`. Also returns list of all properties from all the docs (not all
* docs have to have the same keys).
* @param hits
*/
const flattenHits = (hits: Doc[]): { docs: Array<Record<string, any>>; propNames: string[] } => {
const docs: any[] = [];
// We keep a list of all props so that we can create all the fields in the dataFrame, this can lead
// to wide sparse dataframes in case the scheme is different per document.
let propNames: string[] = [];
for (const hit of hits) {
const flattened = hit._source ? flatten(hit._source, null) : {};
const doc = {
_id: hit._id,
_type: hit._type,
_index: hit._index,
_source: { ...flattened },
...flattened,
};
for (const propName of Object.keys(doc)) {
if (propNames.indexOf(propName) === -1) {
propNames.push(propName);
}
}
docs.push(doc);
}
propNames.sort();
return { docs, propNames };
};
/**
* Create empty dataframe but with created fields. Fields are based from propNames (should be from the response) and
* also from configuration specified fields for message, time, and level.
* @param propNames
* @param timeField
* @param logMessageField
* @param logLevelField
*/
const createEmptyDataFrame = (
propNames: string[],
timeField: string,
logMessageField?: string,
logLevelField?: string
): MutableDataFrame => {
const series = new MutableDataFrame({ fields: [] });
series.addField({
name: timeField,
type: FieldType.time,
});
if (logMessageField) {
series.addField({
name: logMessageField,
type: FieldType.string,
}).parse = (v: any) => {
return v || '';
};
} else {
series.addField({
name: '_source',
type: FieldType.string,
}).parse = (v: any) => {
return JSON.stringify(v, null, 2);
};
}
if (logLevelField) {
series.addField({
name: 'level',
type: FieldType.string,
}).parse = (v: any) => {
return v || '';
};
}
const fieldNames = series.fields.map(field => field.name);
for (const propName of propNames) {
// Do not duplicate fields. This can mean that we will shadow some fields.
if (fieldNames.includes(propName)) {
continue;
}
series.addField({
name: propName,
type: FieldType.string,
}).parse = (v: any) => {
return v || '';
};
}
return series;
};
import { DataFrameView, KeyValue, MutableDataFrame } from '@grafana/data'; import { DataFrameView, FieldCache, KeyValue, MutableDataFrame } from '@grafana/data';
import { ElasticResponse } from '../elastic_response'; import { ElasticResponse } from '../elastic_response';
import flatten from 'app/core/utils/flatten';
describe('ElasticResponse', () => { describe('ElasticResponse', () => {
let targets; let targets;
...@@ -827,71 +828,76 @@ describe('ElasticResponse', () => { ...@@ -827,71 +828,76 @@ describe('ElasticResponse', () => {
}); });
describe('simple logs query and count', () => { describe('simple logs query and count', () => {
beforeEach(() => { const targets: any = [
targets = [ {
refId: 'A',
metrics: [{ type: 'count', id: '1' }],
bucketAggs: [{ type: 'date_histogram', settings: { interval: 'auto' }, id: '2' }],
context: 'explore',
interval: '10s',
isLogsQuery: true,
key: 'Q-1561369883389-0.7611823271062786-0',
liveStreaming: false,
maxDataPoints: 1620,
query: '',
timeField: '@timestamp',
},
];
const response = {
responses: [
{ {
refId: 'A', aggregations: {
metrics: [{ type: 'count', id: '1' }], '2': {
bucketAggs: [{ type: 'date_histogram', settings: { interval: 'auto' }, id: '2' }], buckets: [
context: 'explore',
interval: '10s',
isLogsQuery: true,
key: 'Q-1561369883389-0.7611823271062786-0',
liveStreaming: false,
maxDataPoints: 1620,
query: '',
timeField: '@timestamp',
},
];
response = {
responses: [
{
aggregations: {
'2': {
buckets: [
{
doc_count: 10,
key: 1000,
},
{
doc_count: 15,
key: 2000,
},
],
},
},
hits: {
hits: [
{ {
_id: 'fdsfs', doc_count: 10,
_type: '_doc', key: 1000,
_index: 'mock-index',
_source: {
'@timestamp': '2019-06-24T09:51:19.765Z',
host: 'djisaodjsoad',
message: 'hello, i am a message',
},
}, },
{ {
_id: 'kdospaidopa', doc_count: 15,
_type: '_doc', key: 2000,
_index: 'mock-index',
_source: {
'@timestamp': '2019-06-24T09:52:19.765Z',
host: 'dsalkdakdop',
message: 'hello, i am also message',
},
}, },
], ],
}, },
}, },
], hits: {
}; hits: [
{
result = new ElasticResponse(targets, response).getLogs(); _id: 'fdsfs',
}); _type: '_doc',
_index: 'mock-index',
_source: {
'@timestamp': '2019-06-24T09:51:19.765Z',
host: 'djisaodjsoad',
message: 'hello, i am a message',
level: 'debug',
fields: {
lvl: 'debug',
},
},
},
{
_id: 'kdospaidopa',
_type: '_doc',
_index: 'mock-index',
_source: {
'@timestamp': '2019-06-24T09:52:19.765Z',
host: 'dsalkdakdop',
message: 'hello, i am also message',
level: 'error',
fields: {
lvl: 'info',
},
},
},
],
},
},
],
};
it('should return histogram aggregation and documents', () => { it('should return histogram aggregation and documents', () => {
const result = new ElasticResponse(targets, response).getLogs();
expect(result.data.length).toBe(2); expect(result.data.length).toBe(2);
const logResults = result.data[0] as MutableDataFrame; const logResults = result.data[0] as MutableDataFrame;
const fields = logResults.fields.map(f => { const fields = logResults.fields.map(f => {
...@@ -911,7 +917,7 @@ describe('ElasticResponse', () => { ...@@ -911,7 +917,7 @@ describe('ElasticResponse', () => {
expect(r._id).toEqual(response.responses[0].hits.hits[i]._id); expect(r._id).toEqual(response.responses[0].hits.hits[i]._id);
expect(r._type).toEqual(response.responses[0].hits.hits[i]._type); expect(r._type).toEqual(response.responses[0].hits.hits[i]._type);
expect(r._index).toEqual(response.responses[0].hits.hits[i]._index); expect(r._index).toEqual(response.responses[0].hits.hits[i]._index);
expect(r._source).toEqual(response.responses[0].hits.hits[i]._source); expect(r._source).toEqual(flatten(response.responses[0].hits.hits[i]._source, null));
} }
// Make a map from the histogram results // Make a map from the histogram results
...@@ -927,5 +933,19 @@ describe('ElasticResponse', () => { ...@@ -927,5 +933,19 @@ describe('ElasticResponse', () => {
expect(hist[bucket.key]).toEqual(bucket.doc_count); expect(hist[bucket.key]).toEqual(bucket.doc_count);
}); });
}); });
it('should map levels field', () => {
const result = new ElasticResponse(targets, response).getLogs(undefined, 'level');
const fieldCache = new FieldCache(result.data[0]);
const field = fieldCache.getFieldByName('level');
expect(field.values.toArray()).toEqual(['debug', 'error']);
});
it('should re map levels field to new field', () => {
const result = new ElasticResponse(targets, response).getLogs(undefined, 'fields.lvl');
const fieldCache = new FieldCache(result.data[0]);
const field = fieldCache.getFieldByName('level');
expect(field.values.toArray()).toEqual(['debug', 'info']);
});
}); });
}); });
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment