Commit c46d01dc by Marcus Efraimsson Committed by GitHub

Merge pull request #14090 from marefr/8843_pipeline_doc_count

Fix pipeline aggregations on doc count
parents ed2a9b8e 18810ca7
......@@ -35,7 +35,7 @@ datasources:
tsdbResolution: 1
tsdbVersion: 1
- name: gdev-elasticsearch-metrics
- name: gdev-elasticsearch-v2-metrics
type: elasticsearch
access: proxy
database: "[metrics-]YYYY.MM.DD"
......@@ -43,6 +43,57 @@ datasources:
jsonData:
interval: Daily
timeField: "@timestamp"
esVersion: 2
- name: gdev-elasticsearch-v2-logs
type: elasticsearch
access: proxy
database: "[logs-]YYYY.MM.DD"
url: http://localhost:9200
jsonData:
interval: Daily
timeField: "@timestamp"
esVersion: 2
- name: gdev-elasticsearch-v5-metrics
type: elasticsearch
access: proxy
database: "[metrics-]YYYY.MM.DD"
url: http://localhost:10200
jsonData:
interval: Daily
timeField: "@timestamp"
esVersion: 5
- name: gdev-elasticsearch-v5-logs
type: elasticsearch
access: proxy
database: "[logs-]YYYY.MM.DD"
url: http://localhost:10200
jsonData:
interval: Daily
timeField: "@timestamp"
esVersion: 5
- name: gdev-elasticsearch-v6-metrics
type: elasticsearch
access: proxy
database: "[metrics-]YYYY.MM.DD"
url: http://localhost:11200
jsonData:
interval: Daily
timeField: "@timestamp"
esVersion: 60
- name: gdev-elasticsearch-v6-logs
type: elasticsearch
access: proxy
database: "[logs-]YYYY.MM.DD"
url: http://localhost:11200
jsonData:
interval: Daily
timeField: "@timestamp"
esVersion: 60
- name: gdev-mysql
type: mysql
......
This source diff could not be displayed because it is too large. You can view the blob instead.
......@@ -5,7 +5,7 @@
- "9200:9200"
- "9300:9300"
volumes:
- ./blocks/elastic/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml
- ./docker/blocks/elastic/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml
fake-elastic-data:
image: grafana/fake-data-gen
......
......@@ -73,5 +73,8 @@ func isPipelineAgg(metricType string) bool {
func describeMetric(metricType, field string) string {
text := metricAggType[metricType]
if metricType == countType {
return text
}
return text + " " + field
}
......@@ -89,15 +89,29 @@ func (e *timeSeriesQuery) execute() (*tsdb.Response, error) {
}
for _, m := range q.Metrics {
if m.Type == "count" {
if m.Type == countType {
continue
}
if isPipelineAgg(m.Type) {
if _, err := strconv.Atoi(m.PipelineAggregate); err == nil {
aggBuilder.Pipeline(m.ID, m.Type, m.PipelineAggregate, func(a *es.PipelineAggregation) {
var appliedAgg *MetricAgg
for _, pipelineMetric := range q.Metrics {
if pipelineMetric.ID == m.PipelineAggregate {
appliedAgg = pipelineMetric
break
}
}
if appliedAgg != nil {
bucketPath := m.PipelineAggregate
if appliedAgg.Type == countType {
bucketPath = "_count"
}
aggBuilder.Pipeline(m.ID, m.Type, bucketPath, func(a *es.PipelineAggregation) {
a.Settings = m.Settings.MustMap()
})
}
} else {
continue
}
......
......@@ -418,6 +418,38 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
So(pl.BucketPath, ShouldEqual, "3")
})
Convey("With moving average doc count", func() {
c := newFakeClient(5)
_, err := executeTsdbQuery(c, `{
"timeField": "@timestamp",
"bucketAggs": [
{ "type": "date_histogram", "field": "@timestamp", "id": "4" }
],
"metrics": [
{ "id": "3", "type": "count", "field": "select field" },
{
"id": "2",
"type": "moving_avg",
"field": "3",
"pipelineAgg": "3"
}
]
}`, from, to, 15*time.Second)
So(err, ShouldBeNil)
sr := c.multisearchRequests[0].Requests[0]
firstLevel := sr.Aggs[0]
So(firstLevel.Key, ShouldEqual, "4")
So(firstLevel.Aggregation.Type, ShouldEqual, "date_histogram")
So(firstLevel.Aggregation.Aggs, ShouldHaveLength, 1)
movingAvgAgg := firstLevel.Aggregation.Aggs[0]
So(movingAvgAgg.Key, ShouldEqual, "2")
So(movingAvgAgg.Aggregation.Type, ShouldEqual, "moving_avg")
pl := movingAvgAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
So(pl.BucketPath, ShouldEqual, "_count")
})
Convey("With broken moving average", func() {
c := newFakeClient(5)
_, err := executeTsdbQuery(c, `{
......@@ -483,6 +515,34 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
So(plAgg.BucketPath, ShouldEqual, "3")
})
Convey("With derivative doc count", func() {
c := newFakeClient(5)
_, err := executeTsdbQuery(c, `{
"timeField": "@timestamp",
"bucketAggs": [
{ "type": "date_histogram", "field": "@timestamp", "id": "4" }
],
"metrics": [
{ "id": "3", "type": "count", "field": "select field" },
{
"id": "2",
"type": "derivative",
"pipelineAgg": "3"
}
]
}`, from, to, 15*time.Second)
So(err, ShouldBeNil)
sr := c.multisearchRequests[0].Requests[0]
firstLevel := sr.Aggs[0]
So(firstLevel.Key, ShouldEqual, "4")
So(firstLevel.Aggregation.Type, ShouldEqual, "date_histogram")
derivativeAgg := firstLevel.Aggregation.Aggs[0]
So(derivativeAgg.Key, ShouldEqual, "2")
plAgg := derivativeAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
So(plAgg.BucketPath, ShouldEqual, "_count")
})
})
}
......
......@@ -270,7 +270,14 @@ export class ElasticQueryBuilder {
if (queryDef.isPipelineAgg(metric.type)) {
if (metric.pipelineAgg && /^\d*$/.test(metric.pipelineAgg)) {
const appliedAgg = queryDef.findMetricById(target.metrics, metric.pipelineAgg);
if (appliedAgg) {
if (appliedAgg.type === 'count') {
metricAgg = { buckets_path: '_count' };
} else {
metricAgg = { buckets_path: metric.pipelineAgg };
}
}
} else {
continue;
}
......
......@@ -213,6 +213,9 @@ export function describeOrder(order) {
export function describeMetric(metric) {
const def = _.find(metricAggTypes, { value: metric.type });
if (!def.requiresField && !isPipelineAgg(metric.type)) {
return def.text;
}
return def.text + ' ' + metric.field;
}
......@@ -236,3 +239,7 @@ export function defaultMetricAgg() {
export function defaultBucketAgg() {
return { type: 'date_histogram', id: '2', settings: { interval: 'auto' } };
}
export const findMetricById = (metrics: any[], id: any) => {
return _.find(metrics, { id: id });
};
......@@ -250,6 +250,31 @@ describe('ElasticQueryBuilder', () => {
expect(firstLevel.aggs['2'].moving_avg.buckets_path).toBe('3');
});
it('with moving average doc count', () => {
const query = builder.build({
metrics: [
{
id: '3',
type: 'count',
field: 'select field',
},
{
id: '2',
type: 'moving_avg',
field: '3',
pipelineAgg: '3',
},
],
bucketAggs: [{ type: 'date_histogram', field: '@timestamp', id: '4' }],
});
const firstLevel = query.aggs['4'];
expect(firstLevel.aggs['2']).not.toBe(undefined);
expect(firstLevel.aggs['2'].moving_avg).not.toBe(undefined);
expect(firstLevel.aggs['2'].moving_avg.buckets_path).toBe('_count');
});
it('with broken moving average', () => {
const query = builder.build({
metrics: [
......@@ -304,6 +329,30 @@ describe('ElasticQueryBuilder', () => {
expect(firstLevel.aggs['2'].derivative.buckets_path).toBe('3');
});
it('with derivative doc count', () => {
const query = builder.build({
metrics: [
{
id: '3',
type: 'count',
field: 'select field',
},
{
id: '2',
type: 'derivative',
pipelineAgg: '3',
},
],
bucketAggs: [{ type: 'date_histogram', field: '@timestamp', id: '4' }],
});
const firstLevel = query.aggs['4'];
expect(firstLevel.aggs['2']).not.toBe(undefined);
expect(firstLevel.aggs['2'].derivative).not.toBe(undefined);
expect(firstLevel.aggs['2'].derivative.buckets_path).toBe('_count');
});
it('with histogram', () => {
const query = builder.build({
metrics: [{ id: '1', type: 'count' }],
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment