Commit 130e4aa6 by Marcus Efraimsson Committed by GitHub

Merge pull request #14608 from marefr/es_bucket_script

Fix support bucket script pipeline aggregations
parents 574760c7 b45f72a1
...@@ -292,7 +292,7 @@ func (a *MetricAggregation) MarshalJSON() ([]byte, error) { ...@@ -292,7 +292,7 @@ func (a *MetricAggregation) MarshalJSON() ([]byte, error) {
// PipelineAggregation represents a metric aggregation // PipelineAggregation represents a metric aggregation
type PipelineAggregation struct { type PipelineAggregation struct {
BucketPath string BucketPath interface{}
Settings map[string]interface{} Settings map[string]interface{}
} }
......
...@@ -268,7 +268,7 @@ type AggBuilder interface { ...@@ -268,7 +268,7 @@ type AggBuilder interface {
Filters(key string, fn func(a *FiltersAggregation, b AggBuilder)) AggBuilder Filters(key string, fn func(a *FiltersAggregation, b AggBuilder)) AggBuilder
GeoHashGrid(key, field string, fn func(a *GeoHashGridAggregation, b AggBuilder)) AggBuilder GeoHashGrid(key, field string, fn func(a *GeoHashGridAggregation, b AggBuilder)) AggBuilder
Metric(key, metricType, field string, fn func(a *MetricAggregation)) AggBuilder Metric(key, metricType, field string, fn func(a *MetricAggregation)) AggBuilder
Pipeline(key, pipelineType, bucketPath string, fn func(a *PipelineAggregation)) AggBuilder Pipeline(key, pipelineType string, bucketPath interface{}, fn func(a *PipelineAggregation)) AggBuilder
Build() (AggArray, error) Build() (AggArray, error)
} }
...@@ -438,7 +438,7 @@ func (b *aggBuilderImpl) Metric(key, metricType, field string, fn func(a *Metric ...@@ -438,7 +438,7 @@ func (b *aggBuilderImpl) Metric(key, metricType, field string, fn func(a *Metric
return b return b
} }
func (b *aggBuilderImpl) Pipeline(key, pipelineType, bucketPath string, fn func(a *PipelineAggregation)) AggBuilder { func (b *aggBuilderImpl) Pipeline(key, pipelineType string, bucketPath interface{}, fn func(a *PipelineAggregation)) AggBuilder {
innerAgg := &PipelineAggregation{ innerAgg := &PipelineAggregation{
BucketPath: bucketPath, BucketPath: bucketPath,
Settings: make(map[string]interface{}), Settings: make(map[string]interface{}),
......
...@@ -25,13 +25,14 @@ type BucketAgg struct { ...@@ -25,13 +25,14 @@ type BucketAgg struct {
// MetricAgg represents a metric aggregation of the time series query model of the datasource // MetricAgg represents a metric aggregation of the time series query model of the datasource
type MetricAgg struct { type MetricAgg struct {
Field string `json:"field"` Field string `json:"field"`
Hide bool `json:"hide"` Hide bool `json:"hide"`
ID string `json:"id"` ID string `json:"id"`
PipelineAggregate string `json:"pipelineAgg"` PipelineAggregate string `json:"pipelineAgg"`
Settings *simplejson.Json `json:"settings"` PipelineVariables map[string]string `json:"pipelineVariables"`
Meta *simplejson.Json `json:"meta"` Settings *simplejson.Json `json:"settings"`
Type string `json:"type"` Meta *simplejson.Json `json:"meta"`
Type string `json:"type"`
} }
var metricAggType = map[string]string{ var metricAggType = map[string]string{
...@@ -45,6 +46,7 @@ var metricAggType = map[string]string{ ...@@ -45,6 +46,7 @@ var metricAggType = map[string]string{
"cardinality": "Unique Count", "cardinality": "Unique Count",
"moving_avg": "Moving Average", "moving_avg": "Moving Average",
"derivative": "Derivative", "derivative": "Derivative",
"bucket_script": "Bucket Script",
"raw_document": "Raw Document", "raw_document": "Raw Document",
} }
...@@ -60,8 +62,13 @@ var extendedStats = map[string]string{ ...@@ -60,8 +62,13 @@ var extendedStats = map[string]string{
} }
var pipelineAggType = map[string]string{ var pipelineAggType = map[string]string{
"moving_avg": "moving_avg", "moving_avg": "moving_avg",
"derivative": "derivative", "derivative": "derivative",
"bucket_script": "bucket_script",
}
var pipelineAggWithMultipleBucketPathsType = map[string]string{
"bucket_script": "bucket_script",
} }
func isPipelineAgg(metricType string) bool { func isPipelineAgg(metricType string) bool {
...@@ -71,6 +78,13 @@ func isPipelineAgg(metricType string) bool { ...@@ -71,6 +78,13 @@ func isPipelineAgg(metricType string) bool {
return false return false
} }
func isPipelineAggWithMultipleBucketPaths(metricType string) bool {
if _, ok := pipelineAggWithMultipleBucketPathsType[metricType]; ok {
return true
}
return false
}
func describeMetric(metricType, field string) string { func describeMetric(metricType, field string) string {
text := metricAggType[metricType] text := metricAggType[metricType]
if metricType == countType { if metricType == countType {
......
...@@ -260,6 +260,7 @@ func (rp *responseParser) processMetrics(esAgg *simplejson.Json, target *Query, ...@@ -260,6 +260,7 @@ func (rp *responseParser) processMetrics(esAgg *simplejson.Json, target *Query,
newSeries.Tags["metric"] = metric.Type newSeries.Tags["metric"] = metric.Type
newSeries.Tags["field"] = metric.Field newSeries.Tags["field"] = metric.Field
newSeries.Tags["metricId"] = metric.ID
for _, v := range esAgg.Get("buckets").MustArray() { for _, v := range esAgg.Get("buckets").MustArray() {
bucket := simplejson.NewFromAny(v) bucket := simplejson.NewFromAny(v)
key := castToNullFloat(bucket.Get("key")) key := castToNullFloat(bucket.Get("key"))
...@@ -459,20 +460,42 @@ func (rp *responseParser) getSeriesName(series *tsdb.TimeSeries, target *Query, ...@@ -459,20 +460,42 @@ func (rp *responseParser) getSeriesName(series *tsdb.TimeSeries, target *Query,
} }
// todo, if field and pipelineAgg // todo, if field and pipelineAgg
if field != "" && isPipelineAgg(metricType) { if field != "" && isPipelineAgg(metricType) {
found := false if isPipelineAggWithMultipleBucketPaths(metricType) {
for _, metric := range target.Metrics { metricID := ""
if metric.ID == field { if v, ok := series.Tags["metricId"]; ok {
metricName += " " + describeMetric(metric.Type, field) metricID = v
found = true }
for _, metric := range target.Metrics {
if metric.ID == metricID {
metricName = metric.Settings.Get("script").MustString()
for name, pipelineAgg := range metric.PipelineVariables {
for _, m := range target.Metrics {
if m.ID == pipelineAgg {
metricName = strings.Replace(metricName, "params."+name, describeMetric(m.Type, m.Field), -1)
}
}
}
}
}
} else {
found := false
for _, metric := range target.Metrics {
if metric.ID == field {
metricName += " " + describeMetric(metric.Type, field)
found = true
}
}
if !found {
metricName = "Unset"
} }
}
if !found {
metricName = "Unset"
} }
} else if field != "" { } else if field != "" {
metricName += " " + field metricName += " " + field
} }
delete(series.Tags, "metricId")
if len(series.Tags) == 0 { if len(series.Tags) == 0 {
return metricName return metricName
} }
......
...@@ -787,6 +787,84 @@ func TestResponseParser(t *testing.T) { ...@@ -787,6 +787,84 @@ func TestResponseParser(t *testing.T) {
So(rows[0][2].(null.Float).Float64, ShouldEqual, 3000) So(rows[0][2].(null.Float).Float64, ShouldEqual, 3000)
}) })
Convey("With bucket_script", func() {
targets := map[string]string{
"A": `{
"timeField": "@timestamp",
"metrics": [
{ "id": "1", "type": "sum", "field": "@value" },
{ "id": "3", "type": "max", "field": "@value" },
{
"id": "4",
"field": "select field",
"pipelineVariables": [{ "name": "var1", "pipelineAgg": "1" }, { "name": "var2", "pipelineAgg": "3" }],
"settings": { "script": "params.var1 * params.var2" },
"type": "bucket_script"
}
],
"bucketAggs": [{ "type": "date_histogram", "field": "@timestamp", "id": "2" }]
}`,
}
response := `{
"responses": [
{
"aggregations": {
"2": {
"buckets": [
{
"1": { "value": 2 },
"3": { "value": 3 },
"4": { "value": 6 },
"doc_count": 60,
"key": 1000
},
{
"1": { "value": 3 },
"3": { "value": 4 },
"4": { "value": 12 },
"doc_count": 60,
"key": 2000
}
]
}
}
}
]
}`
rp, err := newResponseParserForTest(targets, response)
So(err, ShouldBeNil)
result, err := rp.getTimeSeries()
So(err, ShouldBeNil)
So(result.Results, ShouldHaveLength, 1)
queryRes := result.Results["A"]
So(queryRes, ShouldNotBeNil)
So(queryRes.Series, ShouldHaveLength, 3)
seriesOne := queryRes.Series[0]
So(seriesOne.Name, ShouldEqual, "Sum @value")
So(seriesOne.Points, ShouldHaveLength, 2)
So(seriesOne.Points[0][0].Float64, ShouldEqual, 2)
So(seriesOne.Points[0][1].Float64, ShouldEqual, 1000)
So(seriesOne.Points[1][0].Float64, ShouldEqual, 3)
So(seriesOne.Points[1][1].Float64, ShouldEqual, 2000)
seriesTwo := queryRes.Series[1]
So(seriesTwo.Name, ShouldEqual, "Max @value")
So(seriesTwo.Points, ShouldHaveLength, 2)
So(seriesTwo.Points[0][0].Float64, ShouldEqual, 3)
So(seriesTwo.Points[0][1].Float64, ShouldEqual, 1000)
So(seriesTwo.Points[1][0].Float64, ShouldEqual, 4)
So(seriesTwo.Points[1][1].Float64, ShouldEqual, 2000)
seriesThree := queryRes.Series[2]
So(seriesThree.Name, ShouldEqual, "Sum @value * Max @value")
So(seriesThree.Points, ShouldHaveLength, 2)
So(seriesThree.Points[0][0].Float64, ShouldEqual, 6)
So(seriesThree.Points[0][1].Float64, ShouldEqual, 1000)
So(seriesThree.Points[1][0].Float64, ShouldEqual, 12)
So(seriesThree.Points[1][1].Float64, ShouldEqual, 2000)
})
// Convey("Raw documents query", func() { // Convey("Raw documents query", func() {
// targets := map[string]string{ // targets := map[string]string{
// "A": `{ // "A": `{
......
...@@ -94,26 +94,56 @@ func (e *timeSeriesQuery) execute() (*tsdb.Response, error) { ...@@ -94,26 +94,56 @@ func (e *timeSeriesQuery) execute() (*tsdb.Response, error) {
} }
if isPipelineAgg(m.Type) { if isPipelineAgg(m.Type) {
if _, err := strconv.Atoi(m.PipelineAggregate); err == nil { if isPipelineAggWithMultipleBucketPaths(m.Type) {
var appliedAgg *MetricAgg if len(m.PipelineVariables) > 0 {
for _, pipelineMetric := range q.Metrics { bucketPaths := map[string]interface{}{}
if pipelineMetric.ID == m.PipelineAggregate { for name, pipelineAgg := range m.PipelineVariables {
appliedAgg = pipelineMetric if _, err := strconv.Atoi(pipelineAgg); err == nil {
break var appliedAgg *MetricAgg
} for _, pipelineMetric := range q.Metrics {
} if pipelineMetric.ID == pipelineAgg {
if appliedAgg != nil { appliedAgg = pipelineMetric
bucketPath := m.PipelineAggregate break
if appliedAgg.Type == countType { }
bucketPath = "_count" }
if appliedAgg != nil {
if appliedAgg.Type == countType {
bucketPaths[name] = "_count"
} else {
bucketPaths[name] = pipelineAgg
}
}
}
} }
aggBuilder.Pipeline(m.ID, m.Type, bucketPath, func(a *es.PipelineAggregation) { aggBuilder.Pipeline(m.ID, m.Type, bucketPaths, func(a *es.PipelineAggregation) {
a.Settings = m.Settings.MustMap() a.Settings = m.Settings.MustMap()
}) })
} else {
continue
} }
} else { } else {
continue if _, err := strconv.Atoi(m.PipelineAggregate); err == nil {
var appliedAgg *MetricAgg
for _, pipelineMetric := range q.Metrics {
if pipelineMetric.ID == m.PipelineAggregate {
appliedAgg = pipelineMetric
break
}
}
if appliedAgg != nil {
bucketPath := m.PipelineAggregate
if appliedAgg.Type == countType {
bucketPath = "_count"
}
aggBuilder.Pipeline(m.ID, m.Type, bucketPath, func(a *es.PipelineAggregation) {
a.Settings = m.Settings.MustMap()
})
}
} else {
continue
}
} }
} else { } else {
aggBuilder.Metric(m.ID, m.Type, m.Field, func(a *es.MetricAggregation) { aggBuilder.Metric(m.ID, m.Type, m.Field, func(a *es.MetricAggregation) {
...@@ -328,12 +358,20 @@ func (p *timeSeriesQueryParser) parseMetrics(model *simplejson.Json) ([]*MetricA ...@@ -328,12 +358,20 @@ func (p *timeSeriesQueryParser) parseMetrics(model *simplejson.Json) ([]*MetricA
metric.PipelineAggregate = metricJSON.Get("pipelineAgg").MustString() metric.PipelineAggregate = metricJSON.Get("pipelineAgg").MustString()
metric.Settings = simplejson.NewFromAny(metricJSON.Get("settings").MustMap()) metric.Settings = simplejson.NewFromAny(metricJSON.Get("settings").MustMap())
metric.Meta = simplejson.NewFromAny(metricJSON.Get("meta").MustMap()) metric.Meta = simplejson.NewFromAny(metricJSON.Get("meta").MustMap())
metric.Type, err = metricJSON.Get("type").String() metric.Type, err = metricJSON.Get("type").String()
if err != nil { if err != nil {
return nil, err return nil, err
} }
if isPipelineAggWithMultipleBucketPaths(metric.Type) {
metric.PipelineVariables = map[string]string{}
pvArr := metricJSON.Get("pipelineVariables").MustArray()
for _, v := range pvArr {
kv := v.(map[string]interface{})
metric.PipelineVariables[kv["name"].(string)] = kv["pipelineAgg"].(string)
}
}
result = append(result, metric) result = append(result, metric)
} }
return result, nil return result, nil
......
...@@ -543,6 +543,77 @@ func TestExecuteTimeSeriesQuery(t *testing.T) { ...@@ -543,6 +543,77 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
plAgg := derivativeAgg.Aggregation.Aggregation.(*es.PipelineAggregation) plAgg := derivativeAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
So(plAgg.BucketPath, ShouldEqual, "_count") So(plAgg.BucketPath, ShouldEqual, "_count")
}) })
Convey("With bucket_script", func() {
c := newFakeClient(5)
_, err := executeTsdbQuery(c, `{
"timeField": "@timestamp",
"bucketAggs": [
{ "type": "date_histogram", "field": "@timestamp", "id": "4" }
],
"metrics": [
{ "id": "3", "type": "sum", "field": "@value" },
{ "id": "5", "type": "max", "field": "@value" },
{
"id": "2",
"type": "bucket_script",
"pipelineVariables": [
{ "name": "var1", "pipelineAgg": "3" },
{ "name": "var2", "pipelineAgg": "5" }
],
"settings": { "script": "params.var1 * params.var2" }
}
]
}`, from, to, 15*time.Second)
So(err, ShouldBeNil)
sr := c.multisearchRequests[0].Requests[0]
firstLevel := sr.Aggs[0]
So(firstLevel.Key, ShouldEqual, "4")
So(firstLevel.Aggregation.Type, ShouldEqual, "date_histogram")
bucketScriptAgg := firstLevel.Aggregation.Aggs[2]
So(bucketScriptAgg.Key, ShouldEqual, "2")
plAgg := bucketScriptAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
So(plAgg.BucketPath.(map[string]interface{}), ShouldResemble, map[string]interface{}{
"var1": "3",
"var2": "5",
})
})
Convey("With bucket_script doc count", func() {
c := newFakeClient(5)
_, err := executeTsdbQuery(c, `{
"timeField": "@timestamp",
"bucketAggs": [
{ "type": "date_histogram", "field": "@timestamp", "id": "4" }
],
"metrics": [
{ "id": "3", "type": "count", "field": "select field" },
{
"id": "2",
"type": "bucket_script",
"pipelineVariables": [
{ "name": "var1", "pipelineAgg": "3" }
],
"settings": { "script": "params.var1 * 1000" }
}
]
}`, from, to, 15*time.Second)
So(err, ShouldBeNil)
sr := c.multisearchRequests[0].Requests[0]
firstLevel := sr.Aggs[0]
So(firstLevel.Key, ShouldEqual, "4")
So(firstLevel.Aggregation.Type, ShouldEqual, "date_histogram")
bucketScriptAgg := firstLevel.Aggregation.Aggs[0]
So(bucketScriptAgg.Key, ShouldEqual, "2")
plAgg := bucketScriptAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
So(plAgg.BucketPath.(map[string]interface{}), ShouldResemble, map[string]interface{}{
"var1": "_count",
})
})
}) })
} }
......
...@@ -88,6 +88,7 @@ export class ElasticResponse { ...@@ -88,6 +88,7 @@ export class ElasticResponse {
datapoints: [], datapoints: [],
metric: metric.type, metric: metric.type,
field: metric.field, field: metric.field,
metricId: metric.id,
props: props, props: props,
}; };
for (i = 0; i < esAgg.buckets.length; i++) { for (i = 0; i < esAgg.buckets.length; i++) {
...@@ -240,7 +241,7 @@ export class ElasticResponse { ...@@ -240,7 +241,7 @@ export class ElasticResponse {
return metricName; return metricName;
} }
if (group === 'field') { if (group === 'field') {
return series.field; return series.field || '';
} }
return match; return match;
...@@ -248,11 +249,27 @@ export class ElasticResponse { ...@@ -248,11 +249,27 @@ export class ElasticResponse {
} }
if (series.field && queryDef.isPipelineAgg(series.metric)) { if (series.field && queryDef.isPipelineAgg(series.metric)) {
const appliedAgg = _.find(target.metrics, { id: series.field }); if (series.metric && queryDef.isPipelineAggWithMultipleBucketPaths(series.metric)) {
if (appliedAgg) { const agg = _.find(target.metrics, { id: series.metricId });
metricName += ' ' + queryDef.describeMetric(appliedAgg); if (agg && agg.settings.script) {
metricName = agg.settings.script;
for (const pv of agg.pipelineVariables) {
const appliedAgg = _.find(target.metrics, { id: pv.pipelineAgg });
if (appliedAgg) {
metricName = metricName.replace('params.' + pv.name, queryDef.describeMetric(appliedAgg));
}
}
} else {
metricName = 'Unset';
}
} else { } else {
metricName = 'Unset'; const appliedAgg = _.find(target.metrics, { id: series.field });
if (appliedAgg) {
metricName += ' ' + queryDef.describeMetric(appliedAgg);
} else {
metricName = 'Unset';
}
} }
} else if (series.field) { } else if (series.field) {
metricName += ' ' + series.field; metricName += ' ' + series.field;
......
...@@ -35,11 +35,20 @@ export class ElasticMetricAggCtrl { ...@@ -35,11 +35,20 @@ export class ElasticMetricAggCtrl {
$scope.isFirst = $scope.index === 0; $scope.isFirst = $scope.index === 0;
$scope.isSingle = metricAggs.length === 1; $scope.isSingle = metricAggs.length === 1;
$scope.settingsLinkText = ''; $scope.settingsLinkText = '';
$scope.variablesLinkText = '';
$scope.aggDef = _.find($scope.metricAggTypes, { value: $scope.agg.type }); $scope.aggDef = _.find($scope.metricAggTypes, { value: $scope.agg.type });
if (queryDef.isPipelineAgg($scope.agg.type)) { if (queryDef.isPipelineAgg($scope.agg.type)) {
$scope.agg.pipelineAgg = $scope.agg.pipelineAgg || 'select metric'; if (queryDef.isPipelineAggWithMultipleBucketPaths($scope.agg.type)) {
$scope.agg.field = $scope.agg.pipelineAgg; $scope.variablesLinkText = 'Options';
if ($scope.agg.settings.script) {
$scope.variablesLinkText = 'Script: ' + $scope.agg.settings.script.replace(new RegExp('params.', 'g'), '');
}
} else {
$scope.agg.pipelineAgg = $scope.agg.pipelineAgg || 'select metric';
$scope.agg.field = $scope.agg.pipelineAgg;
}
const pipelineOptions = queryDef.getPipelineOptions($scope.agg); const pipelineOptions = queryDef.getPipelineOptions($scope.agg);
if (pipelineOptions.length > 0) { if (pipelineOptions.length > 0) {
...@@ -119,6 +128,10 @@ export class ElasticMetricAggCtrl { ...@@ -119,6 +128,10 @@ export class ElasticMetricAggCtrl {
$scope.updatePipelineAggOptions(); $scope.updatePipelineAggOptions();
}; };
$scope.toggleVariables = () => {
$scope.showVariables = !$scope.showVariables;
};
$scope.onChangeInternal = () => { $scope.onChangeInternal = () => {
$scope.onChange(); $scope.onChange();
}; };
...@@ -152,6 +165,7 @@ export class ElasticMetricAggCtrl { ...@@ -152,6 +165,7 @@ export class ElasticMetricAggCtrl {
$scope.target.bucketAggs = [queryDef.defaultBucketAgg()]; $scope.target.bucketAggs = [queryDef.defaultBucketAgg()];
} }
$scope.showVariables = queryDef.isPipelineAggWithMultipleBucketPaths($scope.agg.type);
$scope.updatePipelineAggOptions(); $scope.updatePipelineAggOptions();
$scope.onChange(); $scope.onChange();
}; };
......
...@@ -13,7 +13,17 @@ ...@@ -13,7 +13,17 @@
<div class="gf-form"> <div class="gf-form">
<metric-segment-model property="agg.type" options="metricAggTypes" on-change="onTypeChange()" custom="false" css-class="width-10"></metric-segment-model> <metric-segment-model property="agg.type" options="metricAggTypes" on-change="onTypeChange()" custom="false" css-class="width-10"></metric-segment-model>
<metric-segment-model ng-if="aggDef.requiresField" property="agg.field" get-options="getFieldsInternal()" on-change="onChange()" css-class="width-12"></metric-segment-model> <metric-segment-model ng-if="aggDef.requiresField" property="agg.field" get-options="getFieldsInternal()" on-change="onChange()" css-class="width-12"></metric-segment-model>
<metric-segment-model ng-if="aggDef.isPipelineAgg" property="agg.pipelineAgg" options="pipelineAggOptions" on-change="onChangeInternal()" custom="false" css-class="width-12"></metric-segment-model> <metric-segment-model ng-if="aggDef.isPipelineAgg && !aggDef.supportsMultipleBucketPaths" property="agg.pipelineAgg" options="pipelineAggOptions" on-change="onChangeInternal()" custom="false" css-class="width-12"></metric-segment-model>
</div>
<div class="gf-form gf-form--grow" ng-if="aggDef.isPipelineAgg && aggDef.supportsMultipleBucketPaths">
<label class="gf-form-label gf-form-label--grow">
<a ng-click="toggleVariables()">
<i class="fa fa-caret-down" ng-show="showVariables"></i>
<i class="fa fa-caret-right" ng-hide="showVariables"></i>
{{variablesLinkText}}
</a>
</label>
</div> </div>
<div class="gf-form gf-form--grow"> <div class="gf-form gf-form--grow">
...@@ -36,6 +46,20 @@ ...@@ -36,6 +46,20 @@
</div> </div>
</div> </div>
<div class="gf-form-group" ng-if="showVariables">
<elastic-pipeline-variables variables="agg.pipelineVariables" options="pipelineAggOptions" on-change="onChangeInternal()" />
<div class="gf-form offset-width-7">
<label class="gf-form-label width-10">
Script
<info-popover mode="right-normal">
Elasticsearch v5.0 and above: Scripting language is Painless. Use <i>params.&lt;var&gt;</i> to reference a variable.<br/><br/>
Elasticsearch pre-v5.0: Scripting language is per default Groovy if not changed. For Groovy use <i>&lt;var&gt;</i> to reference a variable.
</info-popover>
</label>
<input type="text" class="gf-form-input max-width-24" empty-to-null ng-model="agg.settings.script" ng-blur="onChangeInternal()" spellcheck='false' placeholder="params.var1 / params.var2">
</div>
</div>
<div class="gf-form-group" ng-if="showOptions"> <div class="gf-form-group" ng-if="showOptions">
<div class="gf-form offset-width-7" ng-if="agg.type === 'derivative'"> <div class="gf-form offset-width-7" ng-if="agg.type === 'derivative'">
<label class="gf-form-label width-10">Unit</label> <label class="gf-form-label width-10">Unit</label>
...@@ -103,5 +127,5 @@ ...@@ -103,5 +127,5 @@
<tip>The missing parameter defines how documents that are missing a value should be treated. By default they will be ignored but it is also possible to treat them as if they had a value</tip> <tip>The missing parameter defines how documents that are missing a value should be treated. By default they will be ignored but it is also possible to treat them as if they had a value</tip>
</label> </label>
<input type="number" class="gf-form-input max-width-12" empty-to-null ng-model="agg.settings.missing" ng-blur="onChangeInternal()" spellcheck='false'> <input type="number" class="gf-form-input max-width-12" empty-to-null ng-model="agg.settings.missing" ng-blur="onChangeInternal()" spellcheck='false'>
</div> </div>
</div> </div>
<div ng-repeat="var in variables">
<div class="gf-form offset-width-7" ng-if="$index === 0">
<label class="gf-form-label width-10">Variables</label>
<input type="text" class="gf-form-input max-width-12" ng-model="var.name" placeholder="Variable name" ng-blur="onChangeInternal()" spellcheck='false'>
<metric-segment-model property="var.pipelineAgg" options="options" on-change="onChangeInternal()" custom="false" css-class="width-12"></metric-segment-model>
<label class="gf-form-label">
<a class="pointer" ng-click="remove($index)"><i class="fa fa-minus"></i></a>
</label>
<label class="gf-form-label">
<a class="pointer" ng-click="add()"><i class="fa fa-plus"></i></a>
</label>
</div>
<div class="gf-form offset-width-17" ng-if="$index !== 0">
<input type="text" class="gf-form-input max-width-12" ng-model="var.name" placeholder="Variable name" ng-blur="onChangeInternal()" spellcheck='false'>
<metric-segment-model property="var.pipelineAgg" options="options" on-change="onChangeInternal()" custom="false" css-class="width-12"></metric-segment-model>
<label class="gf-form-label">
<a class="pointer" ng-click="remove($index)"><i class="fa fa-minus"></i></a>
</label>
</div>
</div>
import coreModule from 'app/core/core_module';
import _ from 'lodash';
export function elasticPipelineVariables() {
return {
templateUrl: 'public/app/plugins/datasource/elasticsearch/partials/pipeline_variables.html',
controller: 'ElasticPipelineVariablesCtrl',
restrict: 'E',
scope: {
onChange: '&',
variables: '=',
options: '=',
},
};
}
const newVariable = index => {
return {
name: 'var' + index,
pipelineAgg: 'select metric',
};
};
export class ElasticPipelineVariablesCtrl {
constructor($scope) {
$scope.variables = $scope.variables || [newVariable(1)];
$scope.onChangeInternal = () => {
$scope.onChange();
};
$scope.add = () => {
$scope.variables.push(newVariable($scope.variables.length + 1));
$scope.onChange();
};
$scope.remove = index => {
$scope.variables.splice(index, 1);
$scope.onChange();
};
}
}
coreModule.directive('elasticPipelineVariables', elasticPipelineVariables);
coreModule.controller('ElasticPipelineVariablesCtrl', ElasticPipelineVariablesCtrl);
...@@ -189,7 +189,7 @@ export class ElasticQueryBuilder { ...@@ -189,7 +189,7 @@ export class ElasticQueryBuilder {
target.bucketAggs = target.bucketAggs || [queryDef.defaultBucketAgg()]; target.bucketAggs = target.bucketAggs || [queryDef.defaultBucketAgg()];
target.timeField = this.timeField; target.timeField = this.timeField;
let i, nestedAggs, metric; let i, j, pv, nestedAggs, metric;
const query = { const query = {
size: 0, size: 0,
query: { query: {
...@@ -269,17 +269,42 @@ export class ElasticQueryBuilder { ...@@ -269,17 +269,42 @@ export class ElasticQueryBuilder {
let metricAgg = null; let metricAgg = null;
if (queryDef.isPipelineAgg(metric.type)) { if (queryDef.isPipelineAgg(metric.type)) {
if (metric.pipelineAgg && /^\d*$/.test(metric.pipelineAgg)) { if (queryDef.isPipelineAggWithMultipleBucketPaths(metric.type)) {
const appliedAgg = queryDef.findMetricById(target.metrics, metric.pipelineAgg); if (metric.pipelineVariables) {
if (appliedAgg) { metricAgg = {
if (appliedAgg.type === 'count') { buckets_path: {},
metricAgg = { buckets_path: '_count' }; };
} else {
metricAgg = { buckets_path: metric.pipelineAgg }; for (j = 0; j < metric.pipelineVariables.length; j++) {
pv = metric.pipelineVariables[j];
if (pv.name && pv.pipelineAgg && /^\d*$/.test(pv.pipelineAgg)) {
const appliedAgg = queryDef.findMetricById(target.metrics, pv.pipelineAgg);
if (appliedAgg) {
if (appliedAgg.type === 'count') {
metricAgg.buckets_path[pv.name] = '_count';
} else {
metricAgg.buckets_path[pv.name] = pv.pipelineAgg;
}
}
}
} }
} else {
continue;
} }
} else { } else {
continue; if (metric.pipelineAgg && /^\d*$/.test(metric.pipelineAgg)) {
const appliedAgg = queryDef.findMetricById(target.metrics, metric.pipelineAgg);
if (appliedAgg) {
if (appliedAgg.type === 'count') {
metricAgg = { buckets_path: '_count' };
} else {
metricAgg = { buckets_path: metric.pipelineAgg };
}
}
} else {
continue;
}
} }
} else { } else {
metricAgg = { field: metric.field }; metricAgg = { field: metric.field };
......
import './bucket_agg'; import './bucket_agg';
import './metric_agg'; import './metric_agg';
import './pipeline_variables';
import angular from 'angular'; import angular from 'angular';
import _ from 'lodash'; import _ from 'lodash';
...@@ -70,6 +71,9 @@ export class ElasticQueryCtrl extends QueryCtrl { ...@@ -70,6 +71,9 @@ export class ElasticQueryCtrl extends QueryCtrl {
if (aggDef.requiresField) { if (aggDef.requiresField) {
text += metric.field; text += metric.field;
} }
if (aggDef.supportsMultipleBucketPaths) {
text += metric.settings.script.replace(new RegExp('params.', 'g'), '');
}
text += '), '; text += '), ';
}); });
......
...@@ -64,6 +64,14 @@ export const metricAggTypes = [ ...@@ -64,6 +64,14 @@ export const metricAggTypes = [
isPipelineAgg: true, isPipelineAgg: true,
minVersion: 2, minVersion: 2,
}, },
{
text: 'Bucket Script',
value: 'bucket_script',
requiresField: false,
isPipelineAgg: true,
supportsMultipleBucketPaths: true,
minVersion: 2,
},
{ text: 'Raw Document', value: 'raw_document', requiresField: false }, { text: 'Raw Document', value: 'raw_document', requiresField: false },
]; ];
...@@ -128,6 +136,7 @@ export const pipelineOptions = { ...@@ -128,6 +136,7 @@ export const pipelineOptions = {
{ text: 'minimize', default: false }, { text: 'minimize', default: false },
], ],
derivative: [{ text: 'unit', default: undefined }], derivative: [{ text: 'unit', default: undefined }],
bucket_script: [],
}; };
export const movingAvgModelSettings = { export const movingAvgModelSettings = {
...@@ -171,6 +180,14 @@ export function isPipelineAgg(metricType) { ...@@ -171,6 +180,14 @@ export function isPipelineAgg(metricType) {
return false; return false;
} }
export function isPipelineAggWithMultipleBucketPaths(metricType) {
if (metricType) {
return metricAggTypes.find(t => t.value === metricType && t.supportsMultipleBucketPaths) !== undefined;
}
return false;
}
export function getPipelineAggOptions(targets) { export function getPipelineAggOptions(targets) {
const result = []; const result = [];
_.each(targets.metrics, metric => { _.each(targets.metrics, metric => {
......
...@@ -665,4 +665,70 @@ describe('ElasticResponse', () => { ...@@ -665,4 +665,70 @@ describe('ElasticResponse', () => {
expect(result.data[0].datapoints[0].fieldProp).toBe('field'); expect(result.data[0].datapoints[0].fieldProp).toBe('field');
}); });
}); });
describe('with bucket_script ', () => {
let result;
beforeEach(() => {
targets = [
{
refId: 'A',
metrics: [
{ id: '1', type: 'sum', field: '@value' },
{ id: '3', type: 'max', field: '@value' },
{
id: '4',
field: 'select field',
pipelineVariables: [{ name: 'var1', pipelineAgg: '1' }, { name: 'var2', pipelineAgg: '3' }],
settings: { script: 'params.var1 * params.var2' },
type: 'bucket_script',
},
],
bucketAggs: [{ type: 'date_histogram', field: '@timestamp', id: '2' }],
},
];
response = {
responses: [
{
aggregations: {
'2': {
buckets: [
{
1: { value: 2 },
3: { value: 3 },
4: { value: 6 },
doc_count: 60,
key: 1000,
},
{
1: { value: 3 },
3: { value: 4 },
4: { value: 12 },
doc_count: 60,
key: 2000,
},
],
},
},
},
],
};
result = new ElasticResponse(targets, response).getTimeSeries();
});
it('should return 3 series', () => {
expect(result.data.length).toBe(3);
expect(result.data[0].datapoints.length).toBe(2);
expect(result.data[0].target).toBe('Sum @value');
expect(result.data[1].target).toBe('Max @value');
expect(result.data[2].target).toBe('Sum @value * Max @value');
expect(result.data[0].datapoints[0][0]).toBe(2);
expect(result.data[1].datapoints[0][0]).toBe(3);
expect(result.data[2].datapoints[0][0]).toBe(6);
expect(result.data[0].datapoints[1][0]).toBe(3);
expect(result.data[1].datapoints[1][0]).toBe(4);
expect(result.data[2].datapoints[1][0]).toBe(12);
});
});
}); });
...@@ -353,6 +353,83 @@ describe('ElasticQueryBuilder', () => { ...@@ -353,6 +353,83 @@ describe('ElasticQueryBuilder', () => {
expect(firstLevel.aggs['2'].derivative.buckets_path).toBe('_count'); expect(firstLevel.aggs['2'].derivative.buckets_path).toBe('_count');
}); });
it('with bucket_script', () => {
const query = builder.build({
metrics: [
{
id: '1',
type: 'sum',
field: '@value',
},
{
id: '3',
type: 'max',
field: '@value',
},
{
field: 'select field',
id: '4',
meta: {},
pipelineVariables: [
{
name: 'var1',
pipelineAgg: '1',
},
{
name: 'var2',
pipelineAgg: '3',
},
],
settings: {
script: 'params.var1 * params.var2',
},
type: 'bucket_script',
},
],
bucketAggs: [{ type: 'date_histogram', field: '@timestamp', id: '2' }],
});
const firstLevel = query.aggs['2'];
expect(firstLevel.aggs['4']).not.toBe(undefined);
expect(firstLevel.aggs['4'].bucket_script).not.toBe(undefined);
expect(firstLevel.aggs['4'].bucket_script.buckets_path).toMatchObject({ var1: '1', var2: '3' });
});
it('with bucket_script doc count', () => {
const query = builder.build({
metrics: [
{
id: '3',
type: 'count',
field: 'select field',
},
{
field: 'select field',
id: '4',
meta: {},
pipelineVariables: [
{
name: 'var1',
pipelineAgg: '3',
},
],
settings: {
script: 'params.var1 * 1000',
},
type: 'bucket_script',
},
],
bucketAggs: [{ type: 'date_histogram', field: '@timestamp', id: '2' }],
});
const firstLevel = query.aggs['2'];
expect(firstLevel.aggs['4']).not.toBe(undefined);
expect(firstLevel.aggs['4'].bucket_script).not.toBe(undefined);
expect(firstLevel.aggs['4'].bucket_script.buckets_path).toMatchObject({ var1: '_count' });
});
it('with histogram', () => { it('with histogram', () => {
const query = builder.build({ const query = builder.build({
metrics: [{ id: '1', type: 'count' }], metrics: [{ id: '1', type: 'count' }],
......
...@@ -65,6 +65,24 @@ describe('ElasticQueryDef', () => { ...@@ -65,6 +65,24 @@ describe('ElasticQueryDef', () => {
}); });
}); });
describe('isPipelineAggWithMultipleBucketPaths', () => {
describe('bucket_script', () => {
const result = queryDef.isPipelineAggWithMultipleBucketPaths('bucket_script');
test('should have multiple bucket paths support', () => {
expect(result).toBe(true);
});
});
describe('moving_avg', () => {
const result = queryDef.isPipelineAggWithMultipleBucketPaths('moving_avg');
test('should not have multiple bucket paths support', () => {
expect(result).toBe(false);
});
});
});
describe('pipeline aggs depending on esverison', () => { describe('pipeline aggs depending on esverison', () => {
describe('using esversion undefined', () => { describe('using esversion undefined', () => {
test('should not get pipeline aggs', () => { test('should not get pipeline aggs', () => {
...@@ -80,13 +98,13 @@ describe('ElasticQueryDef', () => { ...@@ -80,13 +98,13 @@ describe('ElasticQueryDef', () => {
describe('using esversion 2', () => { describe('using esversion 2', () => {
test('should get pipeline aggs', () => { test('should get pipeline aggs', () => {
expect(queryDef.getMetricAggTypes(2).length).toBe(11); expect(queryDef.getMetricAggTypes(2).length).toBe(12);
}); });
}); });
describe('using esversion 5', () => { describe('using esversion 5', () => {
test('should get pipeline aggs', () => { test('should get pipeline aggs', () => {
expect(queryDef.getMetricAggTypes(5).length).toBe(11); expect(queryDef.getMetricAggTypes(5).length).toBe(12);
}); });
}); });
}); });
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment