Commit 6d57e422 by bergquist

feat(mqe): restricts the executor to max 4 concurrent outgoing requests

parent 7425c635
package mqe
import (
"context"
"net/http"
"net/url"
"path"
"strings"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/log"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/tsdb"
"golang.org/x/net/context/ctxhttp"
)
var (
MaxWorker int = 4
)
type apiClient struct {
*models.DataSource
log log.Logger
httpClient *http.Client
responseParser *ResponseParser
}
func NewApiClient(httpClient *http.Client, datasource *models.DataSource) *apiClient {
return &apiClient{
DataSource: datasource,
log: log.New("tsdb.mqe"),
httpClient: httpClient,
responseParser: NewResponseParser(),
}
}
func (e *apiClient) PerformRequests(ctx context.Context, queries []QueryToSend) (*tsdb.QueryResult, error) {
queryResult := &tsdb.QueryResult{}
queryCount := len(queries)
jobsChan := make(chan QueryToSend, queryCount)
resultChan := make(chan []*tsdb.TimeSeries, queryCount)
errorsChan := make(chan error, 1)
for w := 1; w <= MaxWorker; w++ {
go e.spawnWorker(ctx, w, jobsChan, resultChan, errorsChan)
}
for _, v := range queries {
jobsChan <- v
}
close(jobsChan)
resultCounter := 0
for {
select {
case timeseries := <-resultChan:
queryResult.Series = append(queryResult.Series, timeseries...)
resultCounter++
if resultCounter == queryCount {
close(resultChan)
return queryResult, nil
}
case err := <-errorsChan:
return nil, err
case <-ctx.Done():
return nil, ctx.Err()
}
}
}
func (e *apiClient) spawnWorker(ctx context.Context, id int, jobs chan QueryToSend, results chan []*tsdb.TimeSeries, errors chan error) {
e.log.Debug("Spawning worker", "id", id)
for query := range jobs {
if setting.Env == setting.DEV {
e.log.Debug("Sending request", "query", query.RawQuery)
}
req, err := e.createRequest(query.RawQuery)
resp, err := ctxhttp.Do(ctx, e.httpClient, req)
if err != nil {
errors <- err
return
}
series, err := e.responseParser.Parse(resp, query.QueryRef)
if err != nil {
errors <- err
return
}
results <- series
}
e.log.Debug("Worker is complete", "id", id)
}
func (e *apiClient) createRequest(query string) (*http.Request, error) {
u, err := url.Parse(e.Url)
if err != nil {
return nil, err
}
u.Path = path.Join(u.Path, "query")
payload := simplejson.New()
payload.Set("query", query)
jsonPayload, err := payload.MarshalJSON()
if err != nil {
return nil, err
}
req, err := http.NewRequest(http.MethodPost, u.String(), strings.NewReader(string(jsonPayload)))
if err != nil {
return nil, err
}
req.Header.Set("User-Agent", "Grafana")
req.Header.Set("Content-Type", "application/json")
if e.BasicAuth {
req.SetBasicAuth(e.BasicAuthUser, e.BasicAuthPassword)
}
return req, nil
}
...@@ -3,16 +3,9 @@ package mqe ...@@ -3,16 +3,9 @@ package mqe
import ( import (
"context" "context"
"net/http" "net/http"
"net/url"
"path"
"strings"
"golang.org/x/net/context/ctxhttp"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/log" "github.com/grafana/grafana/pkg/log"
"github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/tsdb" "github.com/grafana/grafana/pkg/tsdb"
) )
...@@ -24,11 +17,11 @@ import ( ...@@ -24,11 +17,11 @@ import (
type MQEExecutor struct { type MQEExecutor struct {
*models.DataSource *models.DataSource
queryParser *QueryParser queryParser *QueryParser
responseParser *ResponseParser apiClient *apiClient
httpClient *http.Client httpClient *http.Client
log log.Logger log log.Logger
tokenClient *TokenClient tokenClient *TokenClient
} }
func NewMQEExecutor(dsInfo *models.DataSource) (tsdb.Executor, error) { func NewMQEExecutor(dsInfo *models.DataSource) (tsdb.Executor, error) {
...@@ -38,12 +31,12 @@ func NewMQEExecutor(dsInfo *models.DataSource) (tsdb.Executor, error) { ...@@ -38,12 +31,12 @@ func NewMQEExecutor(dsInfo *models.DataSource) (tsdb.Executor, error) {
} }
return &MQEExecutor{ return &MQEExecutor{
DataSource: dsInfo, DataSource: dsInfo,
httpClient: httpclient, httpClient: httpclient,
log: log.New("tsdb.mqe"), log: log.New("tsdb.mqe"),
queryParser: NewQueryParser(), queryParser: NewQueryParser(),
responseParser: NewResponseParser(), apiClient: NewApiClient(httpclient, dsInfo),
tokenClient: NewTokenClient(dsInfo), tokenClient: NewTokenClient(dsInfo),
}, nil }, nil
} }
...@@ -85,25 +78,9 @@ func (e *MQEExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, quer ...@@ -85,25 +78,9 @@ func (e *MQEExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, quer
e.log.Debug("Sending request", "url", e.DataSource.Url) e.log.Debug("Sending request", "url", e.DataSource.Url)
queryResult := &tsdb.QueryResult{} queryResult, err := e.apiClient.PerformRequests(ctx, rawQueries)
for _, v := range rawQueries { if err != nil {
if setting.Env == setting.DEV { return result.WithError(err)
e.log.Debug("Executing", "query", v)
}
req, err := e.createRequest(v.RawQuery)
resp, err := ctxhttp.Do(ctx, e.httpClient, req)
if err != nil {
return result.WithError(err)
}
series, err := e.responseParser.Parse(resp, v.QueryRef)
if err != nil {
return result.WithError(err)
}
queryResult.Series = append(queryResult.Series, series.Series...)
} }
result.QueryResults = make(map[string]*tsdb.QueryResult) result.QueryResults = make(map[string]*tsdb.QueryResult)
...@@ -111,34 +88,3 @@ func (e *MQEExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, quer ...@@ -111,34 +88,3 @@ func (e *MQEExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, quer
return result return result
} }
func (e *MQEExecutor) createRequest(query string) (*http.Request, error) {
u, err := url.Parse(e.Url)
if err != nil {
return nil, err
}
u.Path = path.Join(u.Path, "query")
payload := simplejson.New()
payload.Set("query", query)
jsonPayload, err := payload.MarshalJSON()
if err != nil {
return nil, err
}
req, err := http.NewRequest(http.MethodPost, u.String(), strings.NewReader(string(jsonPayload)))
if err != nil {
return nil, err
}
req.Header.Set("User-Agent", "Grafana")
req.Header.Set("Content-Type", "application/json")
if e.BasicAuth {
req.SetBasicAuth(e.BasicAuthUser, e.BasicAuthPassword)
}
return req, nil
}
...@@ -48,7 +48,7 @@ type ResponseParser struct { ...@@ -48,7 +48,7 @@ type ResponseParser struct {
log log.Logger log log.Logger
} }
func (parser *ResponseParser) Parse(res *http.Response, queryRef *Query) (*tsdb.QueryResult, error) { func (parser *ResponseParser) Parse(res *http.Response, queryRef *Query) ([]*tsdb.TimeSeries, error) {
body, err := ioutil.ReadAll(res.Body) body, err := ioutil.ReadAll(res.Body)
defer res.Body.Close() defer res.Body.Close()
if err != nil { if err != nil {
...@@ -71,7 +71,7 @@ func (parser *ResponseParser) Parse(res *http.Response, queryRef *Query) (*tsdb. ...@@ -71,7 +71,7 @@ func (parser *ResponseParser) Parse(res *http.Response, queryRef *Query) (*tsdb.
return nil, fmt.Errorf("Request failed.") return nil, fmt.Errorf("Request failed.")
} }
var series tsdb.TimeSeriesSlice var series []*tsdb.TimeSeries
for _, body := range data.Body { for _, body := range data.Body {
for _, mqeSerie := range body.Series { for _, mqeSerie := range body.Series {
namePrefix := "" namePrefix := ""
...@@ -97,5 +97,5 @@ func (parser *ResponseParser) Parse(res *http.Response, queryRef *Query) (*tsdb. ...@@ -97,5 +97,5 @@ func (parser *ResponseParser) Parse(res *http.Response, queryRef *Query) (*tsdb.
} }
} }
return &tsdb.QueryResult{Series: series}, nil return series, nil
} }
...@@ -31,13 +31,13 @@ func TestMQEResponseParser(t *testing.T) { ...@@ -31,13 +31,13 @@ func TestMQEResponseParser(t *testing.T) {
} }
res, err := parser.Parse(response, queryRef) res, err := parser.Parse(response, queryRef)
So(err, ShouldBeNil) So(err, ShouldBeNil)
So(len(res.Series), ShouldEqual, 2) So(len(res), ShouldEqual, 2)
So(len(res.Series[0].Points), ShouldEqual, 14) So(len(res[0].Points), ShouldEqual, 14)
So(res.Series[0].Name, ShouldEqual, "demoapp staples-lab-1 os.disk.sda3.weighted_io_time") So(res[0].Name, ShouldEqual, "demoapp staples-lab-1 os.disk.sda3.weighted_io_time")
startTime := 1479287280000 startTime := 1479287280000
for i := 0; i < 11; i++ { for i := 0; i < 11; i++ {
So(res.Series[0].Points[i][0].Float64, ShouldEqual, i+1) So(res[0].Points[i][0].Float64, ShouldEqual, i+1)
So(res.Series[0].Points[i][1].Float64, ShouldEqual, startTime+(i*30000)) So(res[0].Points[i][1].Float64, ShouldEqual, startTime+(i*30000))
} }
}) })
}) })
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment