Commit 55f1b36e by bergquist

refactor response flow

parent adda84d1
......@@ -20,7 +20,7 @@ func newBatch(dsId int64, queries QuerySlice) *Batch {
}
}
func (bg *Batch) process(ctx context.Context, queryContext *QueryContext) {
func (bg *Batch) process(ctx context.Context, resultChan chan *BatchResult, tsdbQuery *TsdbQuery) {
executor, err := getExecutorFor(bg.Queries[0].DataSource)
if err != nil {
......@@ -32,22 +32,22 @@ func (bg *Batch) process(ctx context.Context, queryContext *QueryContext) {
for _, query := range bg.Queries {
result.QueryResults[query.RefId] = &QueryResult{Error: result.Error}
}
queryContext.ResultsChan <- result
resultChan <- result
return
}
res := executor.Execute(ctx, bg.Queries, queryContext)
res := executor.Execute(ctx, bg.Queries, tsdbQuery)
bg.Done = true
queryContext.ResultsChan <- res
resultChan <- res
}
func (bg *Batch) addQuery(query *Query) {
bg.Queries = append(bg.Queries, query)
}
func (bg *Batch) allDependenciesAreIn(context *QueryContext) bool {
func (bg *Batch) allDependenciesAreIn(res *Response) bool {
for key := range bg.Depends {
if _, exists := context.Results[key]; !exists {
if _, exists := res.Results[key]; !exists {
return false
}
}
......
......@@ -8,7 +8,7 @@ import (
)
type Executor interface {
Execute(ctx context.Context, queries QuerySlice, query *QueryContext) *BatchResult
Execute(ctx context.Context, queries QuerySlice, query *TsdbQuery) *BatchResult
}
var registry map[string]GetExecutorFn
......
......@@ -11,7 +11,7 @@ type FakeExecutor struct {
resultsFn map[string]ResultsFn
}
type ResultsFn func(context *QueryContext) *QueryResult
type ResultsFn func(context *TsdbQuery) *QueryResult
func NewFakeExecutor(dsInfo *models.DataSource) (*FakeExecutor, error) {
return &FakeExecutor{
......@@ -20,7 +20,7 @@ func NewFakeExecutor(dsInfo *models.DataSource) (*FakeExecutor, error) {
}, nil
}
func (e *FakeExecutor) Execute(ctx context.Context, queries QuerySlice, context *QueryContext) *BatchResult {
func (e *FakeExecutor) Execute(ctx context.Context, queries QuerySlice, context *TsdbQuery) *BatchResult {
result := &BatchResult{QueryResults: make(map[string]*QueryResult)}
for _, query := range queries {
if results, has := e.results[query.RefId]; has {
......
......@@ -47,7 +47,7 @@ func init() {
tsdb.RegisterExecutor("graphite", NewGraphiteExecutor)
}
func (e *GraphiteExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, context *tsdb.QueryContext) *tsdb.BatchResult {
func (e *GraphiteExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, context *tsdb.TsdbQuery) *tsdb.BatchResult {
result := &tsdb.BatchResult{}
from := "-" + formatTimeRange(context.TimeRange.From)
......
......@@ -47,7 +47,7 @@ func init() {
tsdb.RegisterExecutor("influxdb", NewInfluxDBExecutor)
}
func (e *InfluxDBExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, context *tsdb.QueryContext) *tsdb.BatchResult {
func (e *InfluxDBExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, context *tsdb.TsdbQuery) *tsdb.BatchResult {
result := &tsdb.BatchResult{}
query, err := e.getQuery(queries, context)
......@@ -98,7 +98,7 @@ func (e *InfluxDBExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice,
return result
}
func (e *InfluxDBExecutor) getQuery(queries tsdb.QuerySlice, context *tsdb.QueryContext) (*Query, error) {
func (e *InfluxDBExecutor) getQuery(queries tsdb.QuerySlice, context *tsdb.TsdbQuery) (*Query, error) {
for _, v := range queries {
query, err := e.QueryParser.Parse(v.Model, e.DataSource)
......
......@@ -16,7 +16,7 @@ var (
regexpMeasurementPattern *regexp.Regexp = regexp.MustCompile(`^\/.*\/$`)
)
func (query *Query) Build(queryContext *tsdb.QueryContext) (string, error) {
func (query *Query) Build(queryContext *tsdb.TsdbQuery) (string, error) {
var res string
if query.UseRawQuery && query.RawQuery != "" {
......@@ -41,7 +41,7 @@ func (query *Query) Build(queryContext *tsdb.QueryContext) (string, error) {
return res, nil
}
func getDefinedInterval(query *Query, queryContext *tsdb.QueryContext) (*tsdb.Interval, error) {
func getDefinedInterval(query *Query, queryContext *tsdb.TsdbQuery) (*tsdb.Interval, error) {
defaultInterval := tsdb.CalculateInterval(queryContext.TimeRange)
if query.Interval == "" {
......@@ -104,7 +104,7 @@ func (query *Query) renderTags() []string {
return res
}
func (query *Query) renderTimeFilter(queryContext *tsdb.QueryContext) string {
func (query *Query) renderTimeFilter(queryContext *tsdb.TsdbQuery) string {
from := "now() - " + queryContext.TimeRange.From
to := ""
......@@ -115,7 +115,7 @@ func (query *Query) renderTimeFilter(queryContext *tsdb.QueryContext) string {
return fmt.Sprintf("time > %s%s", from, to)
}
func (query *Query) renderSelectors(queryContext *tsdb.QueryContext) string {
func (query *Query) renderSelectors(queryContext *tsdb.TsdbQuery) string {
res := "SELECT "
var selectors []string
......@@ -163,7 +163,7 @@ func (query *Query) renderWhereClause() string {
return res
}
func (query *Query) renderGroupBy(queryContext *tsdb.QueryContext) string {
func (query *Query) renderGroupBy(queryContext *tsdb.TsdbQuery) string {
groupBy := ""
for i, group := range query.GroupBy {
if i == 0 {
......
......@@ -15,7 +15,7 @@ type DefinitionParameters struct {
}
type QueryDefinition struct {
Renderer func(query *Query, queryContext *tsdb.QueryContext, part *QueryPart, innerExpr string) string
Renderer func(query *Query, queryContext *tsdb.TsdbQuery, part *QueryPart, innerExpr string) string
Params []DefinitionParameters
}
......@@ -94,14 +94,14 @@ func init() {
renders["alias"] = QueryDefinition{Renderer: aliasRenderer}
}
func fieldRenderer(query *Query, queryContext *tsdb.QueryContext, part *QueryPart, innerExpr string) string {
func fieldRenderer(query *Query, queryContext *tsdb.TsdbQuery, part *QueryPart, innerExpr string) string {
if part.Params[0] == "*" {
return "*"
}
return fmt.Sprintf(`"%s"`, part.Params[0])
}
func functionRenderer(query *Query, queryContext *tsdb.QueryContext, part *QueryPart, innerExpr string) string {
func functionRenderer(query *Query, queryContext *tsdb.TsdbQuery, part *QueryPart, innerExpr string) string {
for i, param := range part.Params {
if part.Type == "time" && param == "auto" {
part.Params[i] = "$__interval"
......@@ -117,15 +117,15 @@ func functionRenderer(query *Query, queryContext *tsdb.QueryContext, part *Query
return fmt.Sprintf("%s(%s)", part.Type, params)
}
func suffixRenderer(query *Query, queryContext *tsdb.QueryContext, part *QueryPart, innerExpr string) string {
func suffixRenderer(query *Query, queryContext *tsdb.TsdbQuery, part *QueryPart, innerExpr string) string {
return fmt.Sprintf("%s %s", innerExpr, part.Params[0])
}
func aliasRenderer(query *Query, queryContext *tsdb.QueryContext, part *QueryPart, innerExpr string) string {
func aliasRenderer(query *Query, queryContext *tsdb.TsdbQuery, part *QueryPart, innerExpr string) string {
return fmt.Sprintf(`%s AS "%s"`, innerExpr, part.Params[0])
}
func (r QueryDefinition) Render(query *Query, queryContext *tsdb.QueryContext, part *QueryPart, innerExpr string) string {
func (r QueryDefinition) Render(query *Query, queryContext *tsdb.TsdbQuery, part *QueryPart, innerExpr string) string {
return r.Renderer(query, queryContext, part, innerExpr)
}
......@@ -149,6 +149,6 @@ type QueryPart struct {
Params []string
}
func (qp *QueryPart) Render(query *Query, queryContext *tsdb.QueryContext, expr string) string {
func (qp *QueryPart) Render(query *Query, queryContext *tsdb.TsdbQuery, expr string) string {
return qp.Def.Renderer(query, queryContext, qp, expr)
}
......@@ -10,7 +10,7 @@ import (
func TestInfluxdbQueryPart(t *testing.T) {
Convey("Influxdb query parts", t, func() {
queryContext := &tsdb.QueryContext{TimeRange: tsdb.NewTimeRange("5m", "now")}
queryContext := &tsdb.TsdbQuery{TimeRange: tsdb.NewTimeRange("5m", "now")}
query := &Query{}
Convey("render field ", func() {
......
......@@ -28,7 +28,7 @@ func TestInfluxdbQueryBuilder(t *testing.T) {
tag1 := &Tag{Key: "hostname", Value: "server1", Operator: "="}
tag2 := &Tag{Key: "hostname", Value: "server2", Operator: "=", Condition: "OR"}
queryContext := &tsdb.QueryContext{
queryContext := &tsdb.TsdbQuery{
TimeRange: tsdb.NewTimeRange("5m", "now"),
}
......@@ -101,12 +101,12 @@ func TestInfluxdbQueryBuilder(t *testing.T) {
query := Query{}
Convey("render from: 2h to now-1h", func() {
query := Query{}
queryContext := &tsdb.QueryContext{TimeRange: tsdb.NewTimeRange("2h", "now-1h")}
queryContext := &tsdb.TsdbQuery{TimeRange: tsdb.NewTimeRange("2h", "now-1h")}
So(query.renderTimeFilter(queryContext), ShouldEqual, "time > now() - 2h and time < now() - 1h")
})
Convey("render from: 10m", func() {
queryContext := &tsdb.QueryContext{TimeRange: tsdb.NewTimeRange("10m", "now")}
queryContext := &tsdb.TsdbQuery{TimeRange: tsdb.NewTimeRange("10m", "now")}
So(query.renderTimeFilter(queryContext), ShouldEqual, "time > now() - 10m")
})
})
......
......@@ -6,6 +6,18 @@ import (
"github.com/grafana/grafana/pkg/models"
)
type TsdbQuery struct {
TimeRange *TimeRange
Queries QuerySlice
}
func NewQueryContext(queries QuerySlice, timeRange *TimeRange) *TsdbQuery {
return &TsdbQuery{
TimeRange: timeRange,
Queries: queries,
}
}
type Query struct {
RefId string
Model *simplejson.Json
......
......@@ -12,7 +12,7 @@ func NewQueryParser() *QueryParser {
type QueryParser struct{}
func (qp *QueryParser) Parse(model *simplejson.Json, dsInfo *models.DataSource, queryContext *tsdb.QueryContext) (*Query, error) {
func (qp *QueryParser) Parse(model *simplejson.Json, dsInfo *models.DataSource, queryContext *tsdb.TsdbQuery) (*Query, error) {
query := &Query{TimeRange: queryContext.TimeRange}
query.AddClusterToAlias = model.Get("addClusterToAlias").MustBool(false)
query.AddHostToAlias = model.Get("addHostToAlias").MustBool(false)
......
......@@ -14,7 +14,7 @@ func TestMQEQueryParser(t *testing.T) {
parser := &QueryParser{}
dsInfo := &models.DataSource{JsonData: simplejson.New()}
queryContext := &tsdb.QueryContext{}
queryContext := &tsdb.TsdbQuery{}
Convey("can parse simple mqe model", func() {
json := `
......
......@@ -44,7 +44,7 @@ type QueryToSend struct {
QueryRef *Query
}
func (e *MQEExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, queryContext *tsdb.QueryContext) *tsdb.BatchResult {
func (e *MQEExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, queryContext *tsdb.TsdbQuery) *tsdb.BatchResult {
result := &tsdb.BatchResult{}
availableSeries, err := e.tokenClient.GetTokenData(ctx)
......
......@@ -81,7 +81,7 @@ func (e *MysqlExecutor) initEngine() error {
return nil
}
func (e *MysqlExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, context *tsdb.QueryContext) *tsdb.BatchResult {
func (e *MysqlExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, context *tsdb.TsdbQuery) *tsdb.BatchResult {
result := &tsdb.BatchResult{
QueryResults: make(map[string]*tsdb.QueryResult),
}
......
......@@ -48,7 +48,7 @@ func init() {
tsdb.RegisterExecutor("opentsdb", NewOpenTsdbExecutor)
}
func (e *OpenTsdbExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, queryContext *tsdb.QueryContext) *tsdb.BatchResult {
func (e *OpenTsdbExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, queryContext *tsdb.TsdbQuery) *tsdb.BatchResult {
result := &tsdb.BatchResult{}
var tsdbQuery OpenTsdbQuery
......
......@@ -84,7 +84,7 @@ func (e *PrometheusExecutor) getClient() (apiv1.API, error) {
return apiv1.NewAPI(client), nil
}
func (e *PrometheusExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, queryContext *tsdb.QueryContext) *tsdb.BatchResult {
func (e *PrometheusExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, queryContext *tsdb.TsdbQuery) *tsdb.BatchResult {
result := &tsdb.BatchResult{}
client, err := e.getClient()
......@@ -142,7 +142,7 @@ func formatLegend(metric model.Metric, query *PrometheusQuery) string {
return string(result)
}
func parseQuery(queries tsdb.QuerySlice, queryContext *tsdb.QueryContext) (*PrometheusQuery, error) {
func parseQuery(queries tsdb.QuerySlice, queryContext *tsdb.TsdbQuery) (*PrometheusQuery, error) {
queryModel := queries[0]
expr, err := queryModel.Model.Get("expr").String()
......
package tsdb
import "sync"
type QueryContext struct {
TimeRange *TimeRange
Queries QuerySlice
Results map[string]*QueryResult
ResultsChan chan *BatchResult
Lock sync.RWMutex
BatchWaits sync.WaitGroup
}
func NewQueryContext(queries QuerySlice, timeRange *TimeRange) *QueryContext {
return &QueryContext{
TimeRange: timeRange,
Queries: queries,
ResultsChan: make(chan *BatchResult),
Results: make(map[string]*QueryResult),
}
}
......@@ -7,7 +7,7 @@ import (
type HandleRequestFunc func(ctx context.Context, req *Request) (*Response, error)
func HandleRequest(ctx context.Context, req *Request) (*Response, error) {
context := NewQueryContext(req.Queries, req.TimeRange)
tsdbQuery := NewQueryContext(req.Queries, req.TimeRange)
batches, err := getBatches(req)
if err != nil {
......@@ -15,20 +15,23 @@ func HandleRequest(ctx context.Context, req *Request) (*Response, error) {
}
currentlyExecuting := 0
resultsChan := make(chan *BatchResult)
for _, batch := range batches {
if len(batch.Depends) == 0 {
currentlyExecuting += 1
batch.Started = true
go batch.process(ctx, context)
go batch.process(ctx, resultsChan, tsdbQuery)
}
}
response := &Response{}
response := &Response{
Results: make(map[string]*QueryResult),
}
for currentlyExecuting != 0 {
select {
case batchResult := <-context.ResultsChan:
case batchResult := <-resultsChan:
currentlyExecuting -= 1
response.BatchTimings = append(response.BatchTimings, batchResult.Timings)
......@@ -38,7 +41,7 @@ func HandleRequest(ctx context.Context, req *Request) (*Response, error) {
}
for refId, result := range batchResult.QueryResults {
context.Results[refId] = result
response.Results[refId] = result
}
for _, batch := range batches {
......@@ -47,10 +50,10 @@ func HandleRequest(ctx context.Context, req *Request) (*Response, error) {
continue
}
if batch.allDependenciesAreIn(context) {
if batch.allDependenciesAreIn(response) {
currentlyExecuting += 1
batch.Started = true
go batch.process(ctx, context)
go batch.process(ctx, resultsChan, tsdbQuery)
}
}
case <-ctx.Done():
......@@ -58,6 +61,6 @@ func HandleRequest(ctx context.Context, req *Request) (*Response, error) {
}
}
response.Results = context.Results
//response.Results = tsdbQuery.Results
return response, nil
}
......@@ -11,7 +11,7 @@ import (
"github.com/grafana/grafana/pkg/tsdb"
)
type ScenarioHandler func(query *tsdb.Query, context *tsdb.QueryContext) *tsdb.QueryResult
type ScenarioHandler func(query *tsdb.Query, context *tsdb.TsdbQuery) *tsdb.QueryResult
type Scenario struct {
Id string `json:"id"`
......@@ -33,7 +33,7 @@ func init() {
Id: "random_walk",
Name: "Random Walk",
Handler: func(query *tsdb.Query, context *tsdb.QueryContext) *tsdb.QueryResult {
Handler: func(query *tsdb.Query, context *tsdb.TsdbQuery) *tsdb.QueryResult {
timeWalkerMs := context.TimeRange.GetFromAsMsEpoch()
to := context.TimeRange.GetToAsMsEpoch()
......@@ -60,7 +60,7 @@ func init() {
registerScenario(&Scenario{
Id: "no_data_points",
Name: "No Data Points",
Handler: func(query *tsdb.Query, context *tsdb.QueryContext) *tsdb.QueryResult {
Handler: func(query *tsdb.Query, context *tsdb.TsdbQuery) *tsdb.QueryResult {
return tsdb.NewQueryResult()
},
})
......@@ -68,7 +68,7 @@ func init() {
registerScenario(&Scenario{
Id: "datapoints_outside_range",
Name: "Datapoints Outside Range",
Handler: func(query *tsdb.Query, context *tsdb.QueryContext) *tsdb.QueryResult {
Handler: func(query *tsdb.Query, context *tsdb.TsdbQuery) *tsdb.QueryResult {
queryRes := tsdb.NewQueryResult()
series := newSeriesForQuery(query)
......@@ -85,7 +85,7 @@ func init() {
Id: "csv_metric_values",
Name: "CSV Metric Values",
StringInput: "1,20,90,30,5,0",
Handler: func(query *tsdb.Query, context *tsdb.QueryContext) *tsdb.QueryResult {
Handler: func(query *tsdb.Query, context *tsdb.TsdbQuery) *tsdb.QueryResult {
queryRes := tsdb.NewQueryResult()
stringInput := query.Model.Get("stringInput").MustString()
......
......@@ -24,7 +24,7 @@ func init() {
tsdb.RegisterExecutor("grafana-testdata-datasource", NewTestDataExecutor)
}
func (e *TestDataExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, context *tsdb.QueryContext) *tsdb.BatchResult {
func (e *TestDataExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, context *tsdb.TsdbQuery) *tsdb.BatchResult {
result := &tsdb.BatchResult{}
result.QueryResults = make(map[string]*tsdb.QueryResult)
......
......@@ -140,14 +140,14 @@ func TestMetricQuery(t *testing.T) {
}
fakeExecutor := registerFakeExecutor()
fakeExecutor.HandleQuery("A", func(c *QueryContext) *QueryResult {
fakeExecutor.HandleQuery("A", func(c *TsdbQuery) *QueryResult {
time.Sleep(10 * time.Millisecond)
return &QueryResult{
Series: TimeSeriesSlice{
&TimeSeries{Name: "Ares"},
}}
})
fakeExecutor.HandleQuery("B", func(c *QueryContext) *QueryResult {
fakeExecutor.HandleQuery("B", func(c *TsdbQuery) *QueryResult {
return &QueryResult{
Series: TimeSeriesSlice{
&TimeSeries{Name: "Bres+" + c.Results["A"].Series[0].Name},
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment