Commit 16b5b9f6 by bergquist

make ds a param for Query

parent c0260fd9
...@@ -20,7 +20,7 @@ func NewFakeExecutor(dsInfo *models.DataSource) (*FakeExecutor, error) { ...@@ -20,7 +20,7 @@ func NewFakeExecutor(dsInfo *models.DataSource) (*FakeExecutor, error) {
}, nil }, nil
} }
func (e *FakeExecutor) Query(ctx context.Context, context *TsdbQuery) *BatchResult { func (e *FakeExecutor) Query(ctx context.Context, dsInfo *models.DataSource, context *TsdbQuery) *BatchResult {
result := &BatchResult{QueryResults: make(map[string]*QueryResult)} result := &BatchResult{QueryResults: make(map[string]*QueryResult)}
for _, query := range context.Queries { for _, query := range context.Queries {
if results, has := e.results[query.RefId]; has { if results, has := e.results[query.RefId]; has {
......
...@@ -21,21 +21,11 @@ import ( ...@@ -21,21 +21,11 @@ import (
) )
type GraphiteExecutor struct { type GraphiteExecutor struct {
*models.DataSource
HttpClient *http.Client HttpClient *http.Client
} }
func NewGraphiteExecutor(datasource *models.DataSource) (tsdb.TsdbQueryEndpoint, error) { func NewGraphiteExecutor(datasource *models.DataSource) (tsdb.TsdbQueryEndpoint, error) {
httpClient, err := datasource.GetHttpClient() return &GraphiteExecutor{}, nil
if err != nil {
return nil, err
}
return &GraphiteExecutor{
DataSource: datasource,
HttpClient: httpClient,
}, nil
} }
var ( var (
...@@ -47,7 +37,7 @@ func init() { ...@@ -47,7 +37,7 @@ func init() {
tsdb.RegisterTsdbQueryEndpoint("graphite", NewGraphiteExecutor) tsdb.RegisterTsdbQueryEndpoint("graphite", NewGraphiteExecutor)
} }
func (e *GraphiteExecutor) Query(ctx context.Context, context *tsdb.TsdbQuery) *tsdb.BatchResult { func (e *GraphiteExecutor) Query(ctx context.Context, dsInfo *models.DataSource, context *tsdb.TsdbQuery) *tsdb.BatchResult {
result := &tsdb.BatchResult{} result := &tsdb.BatchResult{}
from := "-" + formatTimeRange(context.TimeRange.From) from := "-" + formatTimeRange(context.TimeRange.From)
...@@ -75,7 +65,13 @@ func (e *GraphiteExecutor) Query(ctx context.Context, context *tsdb.TsdbQuery) * ...@@ -75,7 +65,13 @@ func (e *GraphiteExecutor) Query(ctx context.Context, context *tsdb.TsdbQuery) *
glog.Debug("Graphite request", "params", formData) glog.Debug("Graphite request", "params", formData)
} }
req, err := e.createRequest(formData) req, err := e.createRequest(dsInfo, formData)
if err != nil {
result.Error = err
return result
}
httpClient, err := dsInfo.GetHttpClient()
if err != nil { if err != nil {
result.Error = err result.Error = err
return result return result
...@@ -92,7 +88,7 @@ func (e *GraphiteExecutor) Query(ctx context.Context, context *tsdb.TsdbQuery) * ...@@ -92,7 +88,7 @@ func (e *GraphiteExecutor) Query(ctx context.Context, context *tsdb.TsdbQuery) *
opentracing.HTTPHeaders, opentracing.HTTPHeaders,
opentracing.HTTPHeadersCarrier(req.Header)) opentracing.HTTPHeadersCarrier(req.Header))
res, err := ctxhttp.Do(ctx, e.HttpClient, req) res, err := ctxhttp.Do(ctx, httpClient, req)
if err != nil { if err != nil {
result.Error = err result.Error = err
return result return result
...@@ -144,8 +140,8 @@ func (e *GraphiteExecutor) parseResponse(res *http.Response) ([]TargetResponseDT ...@@ -144,8 +140,8 @@ func (e *GraphiteExecutor) parseResponse(res *http.Response) ([]TargetResponseDT
return data, nil return data, nil
} }
func (e *GraphiteExecutor) createRequest(data url.Values) (*http.Request, error) { func (e *GraphiteExecutor) createRequest(dsInfo *models.DataSource, data url.Values) (*http.Request, error) {
u, _ := url.Parse(e.Url) u, _ := url.Parse(dsInfo.Url)
u.Path = path.Join(u.Path, "render") u.Path = path.Join(u.Path, "render")
req, err := http.NewRequest(http.MethodPost, u.String(), strings.NewReader(data.Encode())) req, err := http.NewRequest(http.MethodPost, u.String(), strings.NewReader(data.Encode()))
...@@ -155,8 +151,8 @@ func (e *GraphiteExecutor) createRequest(data url.Values) (*http.Request, error) ...@@ -155,8 +151,8 @@ func (e *GraphiteExecutor) createRequest(data url.Values) (*http.Request, error)
} }
req.Header.Set("Content-Type", "application/x-www-form-urlencoded") req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
if e.BasicAuth { if dsInfo.BasicAuth {
req.SetBasicAuth(e.BasicAuthUser, e.BasicAuthPassword) req.SetBasicAuth(dsInfo.BasicAuthUser, dsInfo.BasicAuthPassword)
} }
return req, err return req, err
......
...@@ -17,24 +17,16 @@ import ( ...@@ -17,24 +17,16 @@ import (
) )
type InfluxDBExecutor struct { type InfluxDBExecutor struct {
*models.DataSource //*models.DataSource
QueryParser *InfluxdbQueryParser QueryParser *InfluxdbQueryParser
ResponseParser *ResponseParser ResponseParser *ResponseParser
HttpClient *http.Client //HttpClient *http.Client
} }
func NewInfluxDBExecutor(datasource *models.DataSource) (tsdb.TsdbQueryEndpoint, error) { func NewInfluxDBExecutor(datasource *models.DataSource) (tsdb.TsdbQueryEndpoint, error) {
httpClient, err := datasource.GetHttpClient()
if err != nil {
return nil, err
}
return &InfluxDBExecutor{ return &InfluxDBExecutor{
DataSource: datasource,
QueryParser: &InfluxdbQueryParser{}, QueryParser: &InfluxdbQueryParser{},
ResponseParser: &ResponseParser{}, ResponseParser: &ResponseParser{},
HttpClient: httpClient,
}, nil }, nil
} }
...@@ -47,10 +39,10 @@ func init() { ...@@ -47,10 +39,10 @@ func init() {
tsdb.RegisterTsdbQueryEndpoint("influxdb", NewInfluxDBExecutor) tsdb.RegisterTsdbQueryEndpoint("influxdb", NewInfluxDBExecutor)
} }
func (e *InfluxDBExecutor) Query(ctx context.Context, context *tsdb.TsdbQuery) *tsdb.BatchResult { func (e *InfluxDBExecutor) Query(ctx context.Context, dsInfo *models.DataSource, context *tsdb.TsdbQuery) *tsdb.BatchResult {
result := &tsdb.BatchResult{} result := &tsdb.BatchResult{}
query, err := e.getQuery(context.Queries, context) query, err := e.getQuery(dsInfo, context.Queries, context)
if err != nil { if err != nil {
return result.WithError(err) return result.WithError(err)
} }
...@@ -64,12 +56,17 @@ func (e *InfluxDBExecutor) Query(ctx context.Context, context *tsdb.TsdbQuery) * ...@@ -64,12 +56,17 @@ func (e *InfluxDBExecutor) Query(ctx context.Context, context *tsdb.TsdbQuery) *
glog.Debug("Influxdb query", "raw query", rawQuery) glog.Debug("Influxdb query", "raw query", rawQuery)
} }
req, err := e.createRequest(rawQuery) req, err := e.createRequest(dsInfo, rawQuery)
if err != nil {
return result.WithError(err)
}
httpClient, err := dsInfo.GetHttpClient()
if err != nil { if err != nil {
return result.WithError(err) return result.WithError(err)
} }
resp, err := ctxhttp.Do(ctx, e.HttpClient, req) resp, err := ctxhttp.Do(ctx, httpClient, req)
if err != nil { if err != nil {
return result.WithError(err) return result.WithError(err)
} }
...@@ -98,10 +95,10 @@ func (e *InfluxDBExecutor) Query(ctx context.Context, context *tsdb.TsdbQuery) * ...@@ -98,10 +95,10 @@ func (e *InfluxDBExecutor) Query(ctx context.Context, context *tsdb.TsdbQuery) *
return result return result
} }
func (e *InfluxDBExecutor) getQuery(queries []*tsdb.Query, context *tsdb.TsdbQuery) (*Query, error) { func (e *InfluxDBExecutor) getQuery(dsInfo *models.DataSource, queries []*tsdb.Query, context *tsdb.TsdbQuery) (*Query, error) {
for _, v := range queries { for _, v := range queries {
query, err := e.QueryParser.Parse(v.Model, e.DataSource) query, err := e.QueryParser.Parse(v.Model, dsInfo)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -112,8 +109,8 @@ func (e *InfluxDBExecutor) getQuery(queries []*tsdb.Query, context *tsdb.TsdbQue ...@@ -112,8 +109,8 @@ func (e *InfluxDBExecutor) getQuery(queries []*tsdb.Query, context *tsdb.TsdbQue
return nil, fmt.Errorf("query request contains no queries") return nil, fmt.Errorf("query request contains no queries")
} }
func (e *InfluxDBExecutor) createRequest(query string) (*http.Request, error) { func (e *InfluxDBExecutor) createRequest(dsInfo *models.DataSource, query string) (*http.Request, error) {
u, _ := url.Parse(e.Url) u, _ := url.Parse(dsInfo.Url)
u.Path = path.Join(u.Path, "query") u.Path = path.Join(u.Path, "query")
req, err := http.NewRequest(http.MethodGet, u.String(), nil) req, err := http.NewRequest(http.MethodGet, u.String(), nil)
...@@ -123,18 +120,18 @@ func (e *InfluxDBExecutor) createRequest(query string) (*http.Request, error) { ...@@ -123,18 +120,18 @@ func (e *InfluxDBExecutor) createRequest(query string) (*http.Request, error) {
params := req.URL.Query() params := req.URL.Query()
params.Set("q", query) params.Set("q", query)
params.Set("db", e.Database) params.Set("db", dsInfo.Database)
params.Set("epoch", "s") params.Set("epoch", "s")
req.URL.RawQuery = params.Encode() req.URL.RawQuery = params.Encode()
req.Header.Set("User-Agent", "Grafana") req.Header.Set("User-Agent", "Grafana")
if e.BasicAuth { if dsInfo.BasicAuth {
req.SetBasicAuth(e.BasicAuthUser, e.BasicAuthPassword) req.SetBasicAuth(dsInfo.BasicAuthUser, dsInfo.BasicAuthPassword)
} }
if !e.BasicAuth && e.User != "" { if !dsInfo.BasicAuth && dsInfo.User != "" {
req.SetBasicAuth(e.User, e.Password) req.SetBasicAuth(dsInfo.User, dsInfo.Password)
} }
glog.Debug("Influxdb request", "url", req.URL.String()) glog.Debug("Influxdb request", "url", req.URL.String())
......
...@@ -21,7 +21,6 @@ import ( ...@@ -21,7 +21,6 @@ import (
) )
type MysqlExecutor struct { type MysqlExecutor struct {
datasource *models.DataSource
engine *xorm.Engine engine *xorm.Engine
log log.Logger log log.Logger
} }
...@@ -43,11 +42,10 @@ func init() { ...@@ -43,11 +42,10 @@ func init() {
func NewMysqlExecutor(datasource *models.DataSource) (tsdb.TsdbQueryEndpoint, error) { func NewMysqlExecutor(datasource *models.DataSource) (tsdb.TsdbQueryEndpoint, error) {
executor := &MysqlExecutor{ executor := &MysqlExecutor{
datasource: datasource,
log: log.New("tsdb.mysql"), log: log.New("tsdb.mysql"),
} }
err := executor.initEngine() err := executor.initEngine(datasource)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -55,18 +53,24 @@ func NewMysqlExecutor(datasource *models.DataSource) (tsdb.TsdbQueryEndpoint, er ...@@ -55,18 +53,24 @@ func NewMysqlExecutor(datasource *models.DataSource) (tsdb.TsdbQueryEndpoint, er
return executor, nil return executor, nil
} }
func (e *MysqlExecutor) initEngine() error { func (e *MysqlExecutor) initEngine(dsInfo *models.DataSource) error {
engineCache.Lock() engineCache.Lock()
defer engineCache.Unlock() defer engineCache.Unlock()
if engine, present := engineCache.cache[e.datasource.Id]; present { if engine, present := engineCache.cache[dsInfo.Id]; present {
if version, _ := engineCache.versions[e.datasource.Id]; version == e.datasource.Version { if version, _ := engineCache.versions[dsInfo.Id]; version == dsInfo.Version {
e.engine = engine e.engine = engine
return nil return nil
} }
} }
cnnstr := fmt.Sprintf("%s:%s@%s(%s)/%s?collation=utf8mb4_unicode_ci&parseTime=true&loc=UTC", e.datasource.User, e.datasource.Password, "tcp", e.datasource.Url, e.datasource.Database) cnnstr := fmt.Sprintf("%s:%s@%s(%s)/%s?collation=utf8mb4_unicode_ci&parseTime=true&loc=UTC",
dsInfo.User,
dsInfo.Password,
"tcp",
dsInfo.Url,
dsInfo.Database)
e.log.Debug("getEngine", "connection", cnnstr) e.log.Debug("getEngine", "connection", cnnstr)
engine, err := xorm.NewEngine("mysql", cnnstr) engine, err := xorm.NewEngine("mysql", cnnstr)
...@@ -76,12 +80,12 @@ func (e *MysqlExecutor) initEngine() error { ...@@ -76,12 +80,12 @@ func (e *MysqlExecutor) initEngine() error {
return err return err
} }
engineCache.cache[e.datasource.Id] = engine engineCache.cache[dsInfo.Id] = engine
e.engine = engine e.engine = engine
return nil return nil
} }
func (e *MysqlExecutor) Query(ctx context.Context, context *tsdb.TsdbQuery) *tsdb.BatchResult { func (e *MysqlExecutor) Query(ctx context.Context, dsInfo *models.DataSource, context *tsdb.TsdbQuery) *tsdb.BatchResult {
result := &tsdb.BatchResult{ result := &tsdb.BatchResult{
QueryResults: make(map[string]*tsdb.QueryResult), QueryResults: make(map[string]*tsdb.QueryResult),
} }
......
...@@ -22,20 +22,22 @@ import ( ...@@ -22,20 +22,22 @@ import (
) )
type OpenTsdbExecutor struct { type OpenTsdbExecutor struct {
*models.DataSource //*models.DataSource
httpClient *http.Client //httpClient *http.Client
} }
func NewOpenTsdbExecutor(datasource *models.DataSource) (tsdb.TsdbQueryEndpoint, error) { func NewOpenTsdbExecutor(datasource *models.DataSource) (tsdb.TsdbQueryEndpoint, error) {
/*
httpClient, err := datasource.GetHttpClient() httpClient, err := datasource.GetHttpClient()
if err != nil { if err != nil {
return nil, err return nil, err
} }
*/
return &OpenTsdbExecutor{ return &OpenTsdbExecutor{
DataSource: datasource, //DataSource: datasource,
httpClient: httpClient, //httpClient: httpClient,
}, nil }, nil
} }
...@@ -48,7 +50,7 @@ func init() { ...@@ -48,7 +50,7 @@ func init() {
tsdb.RegisterTsdbQueryEndpoint("opentsdb", NewOpenTsdbExecutor) tsdb.RegisterTsdbQueryEndpoint("opentsdb", NewOpenTsdbExecutor)
} }
func (e *OpenTsdbExecutor) Query(ctx context.Context, queryContext *tsdb.TsdbQuery) *tsdb.BatchResult { func (e *OpenTsdbExecutor) Query(ctx context.Context, dsInfo *models.DataSource, queryContext *tsdb.TsdbQuery) *tsdb.BatchResult {
result := &tsdb.BatchResult{} result := &tsdb.BatchResult{}
var tsdbQuery OpenTsdbQuery var tsdbQuery OpenTsdbQuery
...@@ -65,13 +67,19 @@ func (e *OpenTsdbExecutor) Query(ctx context.Context, queryContext *tsdb.TsdbQue ...@@ -65,13 +67,19 @@ func (e *OpenTsdbExecutor) Query(ctx context.Context, queryContext *tsdb.TsdbQue
plog.Debug("OpenTsdb request", "params", tsdbQuery) plog.Debug("OpenTsdb request", "params", tsdbQuery)
} }
req, err := e.createRequest(tsdbQuery) req, err := e.createRequest(dsInfo, tsdbQuery)
if err != nil { if err != nil {
result.Error = err result.Error = err
return result return result
} }
res, err := ctxhttp.Do(ctx, e.httpClient, req) httpClient, err := dsInfo.GetHttpClient()
if err != nil {
result.Error = err
return result
}
res, err := ctxhttp.Do(ctx, httpClient, req)
if err != nil { if err != nil {
result.Error = err result.Error = err
return result return result
...@@ -86,8 +94,8 @@ func (e *OpenTsdbExecutor) Query(ctx context.Context, queryContext *tsdb.TsdbQue ...@@ -86,8 +94,8 @@ func (e *OpenTsdbExecutor) Query(ctx context.Context, queryContext *tsdb.TsdbQue
return result return result
} }
func (e *OpenTsdbExecutor) createRequest(data OpenTsdbQuery) (*http.Request, error) { func (e *OpenTsdbExecutor) createRequest(dsInfo *models.DataSource, data OpenTsdbQuery) (*http.Request, error) {
u, _ := url.Parse(e.Url) u, _ := url.Parse(dsInfo.Url)
u.Path = path.Join(u.Path, "api/query") u.Path = path.Join(u.Path, "api/query")
postData, err := json.Marshal(data) postData, err := json.Marshal(data)
...@@ -99,8 +107,8 @@ func (e *OpenTsdbExecutor) createRequest(data OpenTsdbQuery) (*http.Request, err ...@@ -99,8 +107,8 @@ func (e *OpenTsdbExecutor) createRequest(data OpenTsdbQuery) (*http.Request, err
} }
req.Header.Set("Content-Type", "application/json") req.Header.Set("Content-Type", "application/json")
if e.BasicAuth { if dsInfo.BasicAuth {
req.SetBasicAuth(e.BasicAuthUser, e.BasicAuthPassword) req.SetBasicAuth(dsInfo.BasicAuthUser, dsInfo.BasicAuthPassword)
} }
return req, err return req, err
......
...@@ -18,12 +18,9 @@ import ( ...@@ -18,12 +18,9 @@ import (
api "github.com/prometheus/client_golang/api" api "github.com/prometheus/client_golang/api"
apiv1 "github.com/prometheus/client_golang/api/prometheus/v1" apiv1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
//api "github.com/prometheus/client_golang/api"
//apiv1 "github.com/prometheus/client_golang/api/prometheus/v1"
) )
type PrometheusExecutor struct { type PrometheusExecutor struct {
*models.DataSource
Transport *http.Transport Transport *http.Transport
} }
...@@ -46,7 +43,6 @@ func NewPrometheusExecutor(dsInfo *models.DataSource) (tsdb.TsdbQueryEndpoint, e ...@@ -46,7 +43,6 @@ func NewPrometheusExecutor(dsInfo *models.DataSource) (tsdb.TsdbQueryEndpoint, e
} }
return &PrometheusExecutor{ return &PrometheusExecutor{
DataSource: dsInfo,
Transport: transport, Transport: transport,
}, nil }, nil
} }
...@@ -62,17 +58,17 @@ func init() { ...@@ -62,17 +58,17 @@ func init() {
legendFormat = regexp.MustCompile(`\{\{\s*(.+?)\s*\}\}`) legendFormat = regexp.MustCompile(`\{\{\s*(.+?)\s*\}\}`)
} }
func (e *PrometheusExecutor) getClient() (apiv1.API, error) { func (e *PrometheusExecutor) getClient(dsInfo *models.DataSource) (apiv1.API, error) {
cfg := api.Config{ cfg := api.Config{
Address: e.DataSource.Url, Address: dsInfo.Url,
RoundTripper: e.Transport, RoundTripper: e.Transport,
} }
if e.BasicAuth { if dsInfo.BasicAuth {
cfg.RoundTripper = basicAuthTransport{ cfg.RoundTripper = basicAuthTransport{
Transport: e.Transport, Transport: e.Transport,
username: e.BasicAuthUser, username: dsInfo.BasicAuthUser,
password: e.BasicAuthPassword, password: dsInfo.BasicAuthPassword,
} }
} }
...@@ -84,10 +80,10 @@ func (e *PrometheusExecutor) getClient() (apiv1.API, error) { ...@@ -84,10 +80,10 @@ func (e *PrometheusExecutor) getClient() (apiv1.API, error) {
return apiv1.NewAPI(client), nil return apiv1.NewAPI(client), nil
} }
func (e *PrometheusExecutor) Query(ctx context.Context, queryContext *tsdb.TsdbQuery) *tsdb.BatchResult { func (e *PrometheusExecutor) Query(ctx context.Context, dsInfo *models.DataSource, queryContext *tsdb.TsdbQuery) *tsdb.BatchResult {
result := &tsdb.BatchResult{} result := &tsdb.BatchResult{}
client, err := e.getClient() client, err := e.getClient(dsInfo)
if err != nil { if err != nil {
return result.WithError(err) return result.WithError(err)
} }
......
...@@ -8,7 +8,7 @@ import ( ...@@ -8,7 +8,7 @@ import (
) )
type TsdbQueryEndpoint interface { type TsdbQueryEndpoint interface {
Query(ctx context.Context, query *TsdbQuery) *BatchResult Query(ctx context.Context, ds *models.DataSource, query *TsdbQuery) *BatchResult
} }
var registry map[string]GetTsdbQueryEndpointFn var registry map[string]GetTsdbQueryEndpointFn
......
...@@ -8,13 +8,13 @@ type HandleRequestFunc func(ctx context.Context, req *TsdbQuery) (*Response, err ...@@ -8,13 +8,13 @@ type HandleRequestFunc func(ctx context.Context, req *TsdbQuery) (*Response, err
func HandleRequest(ctx context.Context, req *TsdbQuery) (*Response, error) { func HandleRequest(ctx context.Context, req *TsdbQuery) (*Response, error) {
//TODO niceify //TODO niceify
endpoint, err := getTsdbQueryEndpointFor(req.Queries[0].DataSource) ds := req.Queries[0].DataSource
endpoint, err := getTsdbQueryEndpointFor(ds)
if err != nil { if err != nil {
return nil, err return nil, err
} }
res := endpoint.Query(ctx, req) res := endpoint.Query(ctx, ds, req)
if res.Error != nil { if res.Error != nil {
return nil, res.Error return nil, res.Error
} }
......
...@@ -24,7 +24,7 @@ func init() { ...@@ -24,7 +24,7 @@ func init() {
tsdb.RegisterTsdbQueryEndpoint("grafana-testdata-datasource", NewTestDataExecutor) tsdb.RegisterTsdbQueryEndpoint("grafana-testdata-datasource", NewTestDataExecutor)
} }
func (e *TestDataExecutor) Query(ctx context.Context, context *tsdb.TsdbQuery) *tsdb.BatchResult { func (e *TestDataExecutor) Query(ctx context.Context, dsInfo *models.DataSource, context *tsdb.TsdbQuery) *tsdb.BatchResult {
result := &tsdb.BatchResult{} result := &tsdb.BatchResult{}
result.QueryResults = make(map[string]*tsdb.QueryResult) result.QueryResults = make(map[string]*tsdb.QueryResult)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment