Commit 2c7adccf by Carl Bergquist Committed by Torkel Ödegaard

Use cache for http.client in tsdb package. (#6833)

* datasource: move caching closer to datasource struct

* tsdb: use cached version of datasource http transport

closes #6825
parent 24172fca
package api
import (
"crypto/tls"
"crypto/x509"
"net"
"net/http"
"net/http/httputil"
"net/url"
"sync"
"time"
"github.com/grafana/grafana/pkg/api/cloudwatch"
......@@ -19,75 +15,6 @@ import (
"github.com/grafana/grafana/pkg/util"
)
type proxyTransportCache struct {
cache map[int64]cachedTransport
sync.Mutex
}
type cachedTransport struct {
updated time.Time
*http.Transport
}
var ptc = proxyTransportCache{
cache: make(map[int64]cachedTransport),
}
func DataProxyTransport(ds *m.DataSource) (*http.Transport, error) {
ptc.Lock()
defer ptc.Unlock()
if t, present := ptc.cache[ds.Id]; present && ds.Updated.Equal(t.updated) {
return t.Transport, nil
}
transport := &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
TLSHandshakeTimeout: 10 * time.Second,
}
var tlsAuth, tlsAuthWithCACert bool
if ds.JsonData != nil {
tlsAuth = ds.JsonData.Get("tlsAuth").MustBool(false)
tlsAuthWithCACert = ds.JsonData.Get("tlsAuthWithCACert").MustBool(false)
}
if tlsAuth {
transport.TLSClientConfig.InsecureSkipVerify = false
decrypted := ds.SecureJsonData.Decrypt()
if tlsAuthWithCACert && len(decrypted["tlsCACert"]) > 0 {
caPool := x509.NewCertPool()
ok := caPool.AppendCertsFromPEM([]byte(decrypted["tlsCACert"]))
if ok {
transport.TLSClientConfig.RootCAs = caPool
}
}
cert, err := tls.X509KeyPair([]byte(decrypted["tlsClientCert"]), []byte(decrypted["tlsClientKey"]))
if err != nil {
return nil, err
}
transport.TLSClientConfig.Certificates = []tls.Certificate{cert}
}
ptc.cache[ds.Id] = cachedTransport{
Transport: transport,
updated: ds.Updated,
}
return transport, nil
}
func NewReverseProxy(ds *m.DataSource, proxyPath string, targetUrl *url.URL) *httputil.ReverseProxy {
director := func(req *http.Request) {
req.URL.Scheme = targetUrl.Scheme
......@@ -189,7 +116,7 @@ func ProxyDataSourceRequest(c *middleware.Context) {
}
proxy := NewReverseProxy(ds, proxyPath, targetUrl)
proxy.Transport, err = DataProxyTransport(ds)
proxy.Transport, err = ds.GetHttpTransport()
if err != nil {
c.JsonApiErr(400, "Unable to load TLS certificate", err)
return
......
......@@ -4,24 +4,18 @@ import (
"net/http"
"net/url"
"testing"
"time"
. "github.com/smartystreets/goconvey/convey"
"github.com/grafana/grafana/pkg/components/simplejson"
m "github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/util"
)
func TestDataSourceProxy(t *testing.T) {
Convey("When getting graphite datasource proxy", t, func() {
clearCache()
ds := m.DataSource{Url: "htttp://graphite:8080", Type: m.DS_GRAPHITE}
targetUrl, err := url.Parse(ds.Url)
proxy := NewReverseProxy(&ds, "/render", targetUrl)
proxy.Transport, err = DataProxyTransport(&ds)
proxy.Transport, err = ds.GetHttpTransport()
So(err, ShouldBeNil)
transport, ok := proxy.Transport.(*http.Transport)
......@@ -40,7 +34,6 @@ func TestDataSourceProxy(t *testing.T) {
})
Convey("When getting influxdb datasource proxy", t, func() {
clearCache()
ds := m.DataSource{
Type: m.DS_INFLUXDB_08,
Url: "http://influxdb:8083",
......@@ -67,148 +60,4 @@ func TestDataSourceProxy(t *testing.T) {
So(queryVals["p"][0], ShouldEqual, "password")
})
})
Convey("When caching a datasource proxy", t, func() {
clearCache()
ds := m.DataSource{
Id: 1,
Url: "http://k8s:8001",
Type: "Kubernetes",
}
t1, err := DataProxyTransport(&ds)
So(err, ShouldBeNil)
t2, err := DataProxyTransport(&ds)
So(err, ShouldBeNil)
Convey("Should be using the cached proxy", func() {
So(t2, ShouldEqual, t1)
})
})
Convey("When getting kubernetes datasource proxy", t, func() {
clearCache()
setting.SecretKey = "password"
json := simplejson.New()
json.Set("tlsAuth", true)
json.Set("tlsAuthWithCACert", true)
t := time.Now()
ds := m.DataSource{
Url: "http://k8s:8001",
Type: "Kubernetes",
Updated: t.Add(-2 * time.Minute),
}
transport, err := DataProxyTransport(&ds)
So(err, ShouldBeNil)
Convey("Should have no cert", func() {
So(transport.TLSClientConfig.InsecureSkipVerify, ShouldEqual, true)
})
ds.JsonData = json
ds.SecureJsonData = map[string][]byte{
"tlsCACert": util.Encrypt([]byte(caCert), "password"),
"tlsClientCert": util.Encrypt([]byte(clientCert), "password"),
"tlsClientKey": util.Encrypt([]byte(clientKey), "password"),
}
ds.Updated = t.Add(-1 * time.Minute)
transport, err = DataProxyTransport(&ds)
So(err, ShouldBeNil)
Convey("Should add cert", func() {
So(transport.TLSClientConfig.InsecureSkipVerify, ShouldEqual, false)
So(len(transport.TLSClientConfig.Certificates), ShouldEqual, 1)
})
ds.JsonData = nil
ds.SecureJsonData = map[string][]byte{}
ds.Updated = t
transport, err = DataProxyTransport(&ds)
So(err, ShouldBeNil)
Convey("Should remove cert", func() {
So(transport.TLSClientConfig.InsecureSkipVerify, ShouldEqual, true)
So(len(transport.TLSClientConfig.Certificates), ShouldEqual, 0)
})
})
}
func clearCache() {
ptc.Lock()
defer ptc.Unlock()
ptc.cache = make(map[int64]cachedTransport)
}
const caCert string = `-----BEGIN CERTIFICATE-----
MIIDATCCAemgAwIBAgIJAMQ5hC3CPDTeMA0GCSqGSIb3DQEBCwUAMBcxFTATBgNV
BAMMDGNhLWs4cy1zdGhsbTAeFw0xNjEwMjcwODQyMjdaFw00NDAzMTQwODQyMjda
MBcxFTATBgNVBAMMDGNhLWs4cy1zdGhsbTCCASIwDQYJKoZIhvcNAQEBBQADggEP
ADCCAQoCggEBAMLe2AmJ6IleeUt69vgNchOjjmxIIxz5sp1vFu94m1vUip7CqnOg
QkpUsHeBPrGYv8UGloARCL1xEWS+9FVZeXWQoDmbC0SxXhFwRIESNCET7Q8KMi/4
4YPvnMLGZi3Fjwxa8BdUBCN1cx4WEooMVTWXm7RFMtZgDfuOAn3TNXla732sfT/d
1HNFrh48b0wA+HhmA3nXoBnBEblA665hCeo7lIAdRr0zJxJpnFnWXkyTClsAUTMN
iL905LdBiiIRenojipfKXvMz88XSaWTI7JjZYU3BvhyXndkT6f12cef3I96NY3WJ
0uIK4k04WrbzdYXMU3rN6NqlvbHqnI+E7aMCAwEAAaNQME4wHQYDVR0OBBYEFHHx
2+vSPw9bECHj3O51KNo5VdWOMB8GA1UdIwQYMBaAFHHx2+vSPw9bECHj3O51KNo5
VdWOMAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQELBQADggEBAH2eV5NcV3LBJHs9
I+adbiTPg2vyumrGWwy73T0X8Dtchgt8wU7Q9b9Ucg2fOTmSSyS0iMqEu1Yb2ORB
CknM9mixHC9PwEBbkGCom3VVkqdLwSP6gdILZgyLoH4i8sTUz+S1yGPepi+Vzhs7
adOXtryjcGnwft6HdfKPNklMOHFnjw6uqpho54oj/z55jUpicY/8glDHdrr1bh3k
MHuiWLGewHXPvxfG6UoUx1te65IhifVcJGFZDQwfEmhBflfCmtAJlZEsgTLlBBCh
FHoXIyGOdq1chmRVocdGBCF8fUoGIbuF14r53rpvcbEKtKnnP8+96luKAZLq0a4n
3lb92xM=
-----END CERTIFICATE-----`
const clientCert string = `-----BEGIN CERTIFICATE-----
MIICsjCCAZoCCQCcd8sOfstQLzANBgkqhkiG9w0BAQsFADAXMRUwEwYDVQQDDAxj
YS1rOHMtc3RobG0wHhcNMTYxMTAyMDkyNTE1WhcNMTcxMTAyMDkyNTE1WjAfMR0w
GwYDVQQDDBRhZG0tZGFuaWVsLWs4cy1zdGhsbTCCASIwDQYJKoZIhvcNAQEBBQAD
ggEPADCCAQoCggEBAOMliaWyNEUJKM37vWCl5bGub3lMicyRAqGQyY/qxD9yKKM2
FbucVcmWmg5vvTqQVl5rlQ+c7GI8OD6ptmFl8a26coEki7bFr8bkpSyBSEc5p27b
Z0ORFSqBHWHQbr9PkxPLYW6T3gZYUtRYv3OQgGxLXlvUh85n/mQfuR3N1FgmShHo
GtAFi/ht6leXa0Ms+jNSDLCmXpJm1GIEqgyKX7K3+g3vzo9coYqXq4XTa8Efs2v8
SCwqWfBC3rHfgs/5DLB8WT4Kul8QzxkytzcaBQfRfzhSV6bkgm7oTzt2/1eRRsf4
YnXzLE9YkCC9sAn+Owzqf+TYC1KRluWDfqqBTJUCAwEAATANBgkqhkiG9w0BAQsF
AAOCAQEAdMsZg6edWGC+xngizn0uamrUg1ViaDqUsz0vpzY5NWLA4MsBc4EtxWRP
ueQvjUimZ3U3+AX0YWNLIrH1FCVos2jdij/xkTUmHcwzr8rQy+B17cFi+a8jtpgw
AU6WWoaAIEhhbWQfth/Diz3mivl1ARB+YqiWca2mjRPLTPcKJEURDVddQ423el0Q
4JNxS5icu7T2zYTYHAo/cT9zVdLZl0xuLxYm3asK1IONJ/evxyVZima3il6MPvhe
58Hwz+m+HdqHxi24b/1J/VKYbISG4huOQCdLzeNXgvwFlGPUmHSnnKo1/KbQDAR5
llG/Sw5+FquFuChaA6l5KWy7F3bQyA==
-----END CERTIFICATE-----`
const clientKey string = `-----BEGIN RSA PRIVATE KEY-----
MIIEpQIBAAKCAQEA4yWJpbI0RQkozfu9YKXlsa5veUyJzJECoZDJj+rEP3IoozYV
u5xVyZaaDm+9OpBWXmuVD5zsYjw4Pqm2YWXxrbpygSSLtsWvxuSlLIFIRzmnbttn
Q5EVKoEdYdBuv0+TE8thbpPeBlhS1Fi/c5CAbEteW9SHzmf+ZB+5Hc3UWCZKEega
0AWL+G3qV5drQyz6M1IMsKZekmbUYgSqDIpfsrf6De/Oj1yhiperhdNrwR+za/xI
LCpZ8ELesd+Cz/kMsHxZPgq6XxDPGTK3NxoFB9F/OFJXpuSCbuhPO3b/V5FGx/hi
dfMsT1iQIL2wCf47DOp/5NgLUpGW5YN+qoFMlQIDAQABAoIBAQCzy4u312XeW1Cs
Mx6EuOwmh59/ESFmBkZh4rxZKYgrfE5EWlQ7i5SwG4BX+wR6rbNfy6JSmHDXlTkk
CKvvToVNcW6fYHEivDnVojhIERFIJ4+rhQmpBtcNLOQ3/4cZ8X/GxE6b+3lb5l+x
64mnjPLKRaIr5/+TVuebEy0xNTJmjnJ7yiB2HRz7uXEQaVSk/P7KAkkyl/9J3/LM
8N9AX1w6qDaNQZ4/P0++1H4SQenosM/b/GqGTomarEk/GE0NcB9rzmR9VCXa7FRh
WV5jyt9vUrwIEiK/6nUnOkGO8Ei3kB7Y+e+2m6WdaNoU5RAfqXmXa0Q/a0lLRruf
vTMo2WrBAoGBAPRaK4cx76Q+3SJ/wfznaPsMM06OSR8A3ctKdV+ip/lyKtb1W8Pz
k8MYQDH7GwPtSu5QD8doL00pPjugZL/ba7X9nAsI+pinyEErfnB9y7ORNEjIYYzs
DiqDKup7ANgw1gZvznWvb9Ge0WUSXvWS0pFkgootQAf+RmnnbWGH6l6RAoGBAO35
aGUrLro5u9RD24uSXNU3NmojINIQFK5dHAT3yl0BBYstL43AEsye9lX95uMPTvOQ
Cqcn42Hjp/bSe3n0ObyOZeXVrWcDFAfE0wwB1BkvL1lpgnFO9+VQORlH4w3Ppnpo
jcPkR2TFeDaAYtvckhxe/Bk3OnuFmnsQ3VzM75fFAoGBAI6PvS2XeNU+yA3EtA01
hg5SQ+zlHswz2TMuMeSmJZJnhY78f5mHlwIQOAPxGQXlf/4iP9J7en1uPpzTK3S0
M9duK4hUqMA/w5oiIhbHjf0qDnMYVbG+V1V+SZ+cPBXmCDihKreGr5qBKnHpkfV8
v9WL6o1rcRw4wiQvnaV1gsvBAoGBALtzVTczr6gDKCAIn5wuWy+cQSGTsBunjRLX
xuVm5iEiV+KMYkPvAx/pKzMLP96lRVR3ptyKgAKwl7LFk3u50+zh4gQLr35QH2wL
Lw7rNc3srAhrItPsFzqrWX6/cGuFoKYVS239l/sZzRppQPXcpb7xVvTp2whHcir0
Wtnpl+TdAoGAGqKqo2KU3JoY3IuTDUk1dsNAm8jd9EWDh+s1x4aG4N79mwcss5GD
FF8MbFPneK7xQd8L6HisKUDAUi2NOyynM81LAftPkvN6ZuUVeFDfCL4vCA0HUXLD
+VrOhtUZkNNJlLMiVRJuQKUOGlg8PpObqYbstQAf/0/yFJMRHG82Tcg=
-----END RSA PRIVATE KEY-----`
......@@ -5,10 +5,9 @@ import (
"github.com/grafana/grafana/pkg/api/dtos"
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/plugins"
//"github.com/grafana/grafana/pkg/log"
"github.com/grafana/grafana/pkg/middleware"
m "github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/util"
)
......
......@@ -8,6 +8,7 @@ import (
"github.com/grafana/grafana/pkg/api/dtos"
"github.com/grafana/grafana/pkg/metrics"
"github.com/grafana/grafana/pkg/middleware"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/tsdb"
"github.com/grafana/grafana/pkg/tsdb/testdata"
"github.com/grafana/grafana/pkg/util"
......@@ -25,9 +26,9 @@ func QueryMetrics(c *middleware.Context, reqDto dtos.MetricRequest) Response {
MaxDataPoints: query.Get("maxDataPoints").MustInt64(100),
IntervalMs: query.Get("intervalMs").MustInt64(1000),
Model: query,
DataSource: &tsdb.DataSourceInfo{
DataSource: &models.DataSource{
Name: "Grafana TestDataDB",
PluginId: "grafana-testdata-datasource",
Type: "grafana-testdata-datasource",
},
})
}
......
package models
import (
"crypto/tls"
"crypto/x509"
"net"
"net/http"
"sync"
"time"
)
type proxyTransportCache struct {
cache map[int64]cachedTransport
sync.Mutex
}
type cachedTransport struct {
updated time.Time
*http.Transport
}
var ptc = proxyTransportCache{
cache: make(map[int64]cachedTransport),
}
func (ds *DataSource) GetHttpClient() (*http.Client, error) {
transport, err := ds.GetHttpTransport()
if err != nil {
return nil, err
}
return &http.Client{
Timeout: time.Duration(30 * time.Second),
Transport: transport,
}, nil
}
func (ds *DataSource) GetHttpTransport() (*http.Transport, error) {
ptc.Lock()
defer ptc.Unlock()
if t, present := ptc.cache[ds.Id]; present && ds.Updated.Equal(t.updated) {
return t.Transport, nil
}
transport := &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
}
var tlsAuth, tlsAuthWithCACert bool
if ds.JsonData != nil {
tlsAuth = ds.JsonData.Get("tlsAuth").MustBool(false)
tlsAuthWithCACert = ds.JsonData.Get("tlsAuthWithCACert").MustBool(false)
}
if tlsAuth {
transport.TLSClientConfig.InsecureSkipVerify = false
decrypted := ds.SecureJsonData.Decrypt()
if tlsAuthWithCACert && len(decrypted["tlsCACert"]) > 0 {
caPool := x509.NewCertPool()
ok := caPool.AppendCertsFromPEM([]byte(decrypted["tlsCACert"]))
if ok {
transport.TLSClientConfig.RootCAs = caPool
}
}
cert, err := tls.X509KeyPair([]byte(decrypted["tlsClientCert"]), []byte(decrypted["tlsClientKey"]))
if err != nil {
return nil, err
}
transport.TLSClientConfig.Certificates = []tls.Certificate{cert}
}
ptc.cache[ds.Id] = cachedTransport{
Transport: transport,
updated: ds.Updated,
}
return transport, nil
}
package models
import (
"testing"
"time"
. "github.com/smartystreets/goconvey/convey"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/util"
)
func TestDataSourceCache(t *testing.T) {
Convey("When caching a datasource proxy", t, func() {
clearCache()
ds := DataSource{
Id: 1,
Url: "http://k8s:8001",
Type: "Kubernetes",
}
t1, err := ds.GetHttpTransport()
So(err, ShouldBeNil)
t2, err := ds.GetHttpTransport()
So(err, ShouldBeNil)
Convey("Should be using the cached proxy", func() {
So(t2, ShouldEqual, t1)
})
})
Convey("When getting kubernetes datasource proxy", t, func() {
clearCache()
setting.SecretKey = "password"
json := simplejson.New()
json.Set("tlsAuth", true)
json.Set("tlsAuthWithCACert", true)
t := time.Now()
ds := DataSource{
Url: "http://k8s:8001",
Type: "Kubernetes",
Updated: t.Add(-2 * time.Minute),
}
transport, err := ds.GetHttpTransport()
So(err, ShouldBeNil)
Convey("Should have no cert", func() {
So(transport.TLSClientConfig.InsecureSkipVerify, ShouldEqual, true)
})
ds.JsonData = json
ds.SecureJsonData = map[string][]byte{
"tlsCACert": util.Encrypt([]byte(caCert), "password"),
"tlsClientCert": util.Encrypt([]byte(clientCert), "password"),
"tlsClientKey": util.Encrypt([]byte(clientKey), "password"),
}
ds.Updated = t.Add(-1 * time.Minute)
transport, err = ds.GetHttpTransport()
So(err, ShouldBeNil)
Convey("Should add cert", func() {
So(transport.TLSClientConfig.InsecureSkipVerify, ShouldEqual, false)
So(len(transport.TLSClientConfig.Certificates), ShouldEqual, 1)
})
ds.JsonData = nil
ds.SecureJsonData = map[string][]byte{}
ds.Updated = t
transport, err = ds.GetHttpTransport()
So(err, ShouldBeNil)
Convey("Should remove cert", func() {
So(transport.TLSClientConfig.InsecureSkipVerify, ShouldEqual, true)
So(len(transport.TLSClientConfig.Certificates), ShouldEqual, 0)
})
})
}
func clearCache() {
ptc.Lock()
defer ptc.Unlock()
ptc.cache = make(map[int64]cachedTransport)
}
const caCert string = `-----BEGIN CERTIFICATE-----
MIIDATCCAemgAwIBAgIJAMQ5hC3CPDTeMA0GCSqGSIb3DQEBCwUAMBcxFTATBgNV
BAMMDGNhLWs4cy1zdGhsbTAeFw0xNjEwMjcwODQyMjdaFw00NDAzMTQwODQyMjda
MBcxFTATBgNVBAMMDGNhLWs4cy1zdGhsbTCCASIwDQYJKoZIhvcNAQEBBQADggEP
ADCCAQoCggEBAMLe2AmJ6IleeUt69vgNchOjjmxIIxz5sp1vFu94m1vUip7CqnOg
QkpUsHeBPrGYv8UGloARCL1xEWS+9FVZeXWQoDmbC0SxXhFwRIESNCET7Q8KMi/4
4YPvnMLGZi3Fjwxa8BdUBCN1cx4WEooMVTWXm7RFMtZgDfuOAn3TNXla732sfT/d
1HNFrh48b0wA+HhmA3nXoBnBEblA665hCeo7lIAdRr0zJxJpnFnWXkyTClsAUTMN
iL905LdBiiIRenojipfKXvMz88XSaWTI7JjZYU3BvhyXndkT6f12cef3I96NY3WJ
0uIK4k04WrbzdYXMU3rN6NqlvbHqnI+E7aMCAwEAAaNQME4wHQYDVR0OBBYEFHHx
2+vSPw9bECHj3O51KNo5VdWOMB8GA1UdIwQYMBaAFHHx2+vSPw9bECHj3O51KNo5
VdWOMAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQELBQADggEBAH2eV5NcV3LBJHs9
I+adbiTPg2vyumrGWwy73T0X8Dtchgt8wU7Q9b9Ucg2fOTmSSyS0iMqEu1Yb2ORB
CknM9mixHC9PwEBbkGCom3VVkqdLwSP6gdILZgyLoH4i8sTUz+S1yGPepi+Vzhs7
adOXtryjcGnwft6HdfKPNklMOHFnjw6uqpho54oj/z55jUpicY/8glDHdrr1bh3k
MHuiWLGewHXPvxfG6UoUx1te65IhifVcJGFZDQwfEmhBflfCmtAJlZEsgTLlBBCh
FHoXIyGOdq1chmRVocdGBCF8fUoGIbuF14r53rpvcbEKtKnnP8+96luKAZLq0a4n
3lb92xM=
-----END CERTIFICATE-----`
const clientCert string = `-----BEGIN CERTIFICATE-----
MIICsjCCAZoCCQCcd8sOfstQLzANBgkqhkiG9w0BAQsFADAXMRUwEwYDVQQDDAxj
YS1rOHMtc3RobG0wHhcNMTYxMTAyMDkyNTE1WhcNMTcxMTAyMDkyNTE1WjAfMR0w
GwYDVQQDDBRhZG0tZGFuaWVsLWs4cy1zdGhsbTCCASIwDQYJKoZIhvcNAQEBBQAD
ggEPADCCAQoCggEBAOMliaWyNEUJKM37vWCl5bGub3lMicyRAqGQyY/qxD9yKKM2
FbucVcmWmg5vvTqQVl5rlQ+c7GI8OD6ptmFl8a26coEki7bFr8bkpSyBSEc5p27b
Z0ORFSqBHWHQbr9PkxPLYW6T3gZYUtRYv3OQgGxLXlvUh85n/mQfuR3N1FgmShHo
GtAFi/ht6leXa0Ms+jNSDLCmXpJm1GIEqgyKX7K3+g3vzo9coYqXq4XTa8Efs2v8
SCwqWfBC3rHfgs/5DLB8WT4Kul8QzxkytzcaBQfRfzhSV6bkgm7oTzt2/1eRRsf4
YnXzLE9YkCC9sAn+Owzqf+TYC1KRluWDfqqBTJUCAwEAATANBgkqhkiG9w0BAQsF
AAOCAQEAdMsZg6edWGC+xngizn0uamrUg1ViaDqUsz0vpzY5NWLA4MsBc4EtxWRP
ueQvjUimZ3U3+AX0YWNLIrH1FCVos2jdij/xkTUmHcwzr8rQy+B17cFi+a8jtpgw
AU6WWoaAIEhhbWQfth/Diz3mivl1ARB+YqiWca2mjRPLTPcKJEURDVddQ423el0Q
4JNxS5icu7T2zYTYHAo/cT9zVdLZl0xuLxYm3asK1IONJ/evxyVZima3il6MPvhe
58Hwz+m+HdqHxi24b/1J/VKYbISG4huOQCdLzeNXgvwFlGPUmHSnnKo1/KbQDAR5
llG/Sw5+FquFuChaA6l5KWy7F3bQyA==
-----END CERTIFICATE-----`
const clientKey string = `-----BEGIN RSA PRIVATE KEY-----
MIIEpQIBAAKCAQEA4yWJpbI0RQkozfu9YKXlsa5veUyJzJECoZDJj+rEP3IoozYV
u5xVyZaaDm+9OpBWXmuVD5zsYjw4Pqm2YWXxrbpygSSLtsWvxuSlLIFIRzmnbttn
Q5EVKoEdYdBuv0+TE8thbpPeBlhS1Fi/c5CAbEteW9SHzmf+ZB+5Hc3UWCZKEega
0AWL+G3qV5drQyz6M1IMsKZekmbUYgSqDIpfsrf6De/Oj1yhiperhdNrwR+za/xI
LCpZ8ELesd+Cz/kMsHxZPgq6XxDPGTK3NxoFB9F/OFJXpuSCbuhPO3b/V5FGx/hi
dfMsT1iQIL2wCf47DOp/5NgLUpGW5YN+qoFMlQIDAQABAoIBAQCzy4u312XeW1Cs
Mx6EuOwmh59/ESFmBkZh4rxZKYgrfE5EWlQ7i5SwG4BX+wR6rbNfy6JSmHDXlTkk
CKvvToVNcW6fYHEivDnVojhIERFIJ4+rhQmpBtcNLOQ3/4cZ8X/GxE6b+3lb5l+x
64mnjPLKRaIr5/+TVuebEy0xNTJmjnJ7yiB2HRz7uXEQaVSk/P7KAkkyl/9J3/LM
8N9AX1w6qDaNQZ4/P0++1H4SQenosM/b/GqGTomarEk/GE0NcB9rzmR9VCXa7FRh
WV5jyt9vUrwIEiK/6nUnOkGO8Ei3kB7Y+e+2m6WdaNoU5RAfqXmXa0Q/a0lLRruf
vTMo2WrBAoGBAPRaK4cx76Q+3SJ/wfznaPsMM06OSR8A3ctKdV+ip/lyKtb1W8Pz
k8MYQDH7GwPtSu5QD8doL00pPjugZL/ba7X9nAsI+pinyEErfnB9y7ORNEjIYYzs
DiqDKup7ANgw1gZvznWvb9Ge0WUSXvWS0pFkgootQAf+RmnnbWGH6l6RAoGBAO35
aGUrLro5u9RD24uSXNU3NmojINIQFK5dHAT3yl0BBYstL43AEsye9lX95uMPTvOQ
Cqcn42Hjp/bSe3n0ObyOZeXVrWcDFAfE0wwB1BkvL1lpgnFO9+VQORlH4w3Ppnpo
jcPkR2TFeDaAYtvckhxe/Bk3OnuFmnsQ3VzM75fFAoGBAI6PvS2XeNU+yA3EtA01
hg5SQ+zlHswz2TMuMeSmJZJnhY78f5mHlwIQOAPxGQXlf/4iP9J7en1uPpzTK3S0
M9duK4hUqMA/w5oiIhbHjf0qDnMYVbG+V1V+SZ+cPBXmCDihKreGr5qBKnHpkfV8
v9WL6o1rcRw4wiQvnaV1gsvBAoGBALtzVTczr6gDKCAIn5wuWy+cQSGTsBunjRLX
xuVm5iEiV+KMYkPvAx/pKzMLP96lRVR3ptyKgAKwl7LFk3u50+zh4gQLr35QH2wL
Lw7rNc3srAhrItPsFzqrWX6/cGuFoKYVS239l/sZzRppQPXcpb7xVvTp2whHcir0
Wtnpl+TdAoGAGqKqo2KU3JoY3IuTDUk1dsNAm8jd9EWDh+s1x4aG4N79mwcss5GD
FF8MbFPneK7xQd8L6HisKUDAUi2NOyynM81LAftPkvN6ZuUVeFDfCL4vCA0HUXLD
+VrOhtUZkNNJlLMiVRJuQKUOGlg8PpObqYbstQAf/0/yFJMRHG82Tcg=
-----END RSA PRIVATE KEY-----`
......@@ -121,19 +121,7 @@ func (c *QueryCondition) getRequestForAlertRule(datasource *m.DataSource, timeRa
{
RefId: "A",
Model: c.Query.Model,
DataSource: &tsdb.DataSourceInfo{
Id: datasource.Id,
Name: datasource.Name,
PluginId: datasource.Type,
Url: datasource.Url,
User: datasource.User,
Password: datasource.Password,
Database: datasource.Database,
BasicAuth: datasource.BasicAuth,
BasicAuthUser: datasource.BasicAuthUser,
BasicAuthPassword: datasource.BasicAuthPassword,
JsonData: datasource.JsonData,
},
DataSource: datasource,
},
},
}
......
package tsdb
import (
"context"
"errors"
)
import "context"
type Batch struct {
DataSourceId int64
......@@ -24,12 +21,12 @@ func newBatch(dsId int64, queries QuerySlice) *Batch {
}
func (bg *Batch) process(ctx context.Context, queryContext *QueryContext) {
executor := getExecutorFor(bg.Queries[0].DataSource)
executor, err := getExecutorFor(bg.Queries[0].DataSource)
if executor == nil {
if err != nil {
bg.Done = true
result := &BatchResult{
Error: errors.New("Could not find executor for data source type: " + bg.Queries[0].DataSource.PluginId),
Error: err,
QueryResults: make(map[string]*QueryResult),
}
for _, query := range bg.Queries {
......
package tsdb
import "context"
import (
"context"
"fmt"
"github.com/grafana/grafana/pkg/models"
)
type Executor interface {
Execute(ctx context.Context, queries QuerySlice, query *QueryContext) *BatchResult
......@@ -8,17 +13,22 @@ type Executor interface {
var registry map[string]GetExecutorFn
type GetExecutorFn func(dsInfo *DataSourceInfo) Executor
type GetExecutorFn func(dsInfo *models.DataSource) (Executor, error)
func init() {
registry = make(map[string]GetExecutorFn)
}
func getExecutorFor(dsInfo *DataSourceInfo) Executor {
if fn, exists := registry[dsInfo.PluginId]; exists {
return fn(dsInfo)
func getExecutorFor(dsInfo *models.DataSource) (Executor, error) {
if fn, exists := registry[dsInfo.Type]; exists {
executor, err := fn(dsInfo)
if err != nil {
return nil, err
}
return executor, nil
}
return nil
return nil, fmt.Errorf("Could not find executor for data source type: %s", dsInfo.Type)
}
func RegisterExecutor(pluginId string, fn GetExecutorFn) {
......
package tsdb
import "context"
import (
"context"
"github.com/grafana/grafana/pkg/models"
)
type FakeExecutor struct {
results map[string]*QueryResult
......@@ -9,11 +13,11 @@ type FakeExecutor struct {
type ResultsFn func(context *QueryContext) *QueryResult
func NewFakeExecutor(dsInfo *DataSourceInfo) *FakeExecutor {
func NewFakeExecutor(dsInfo *models.DataSource) (*FakeExecutor, error) {
return &FakeExecutor{
results: make(map[string]*QueryResult),
resultsFn: make(map[string]ResultsFn),
}
}, nil
}
func (e *FakeExecutor) Execute(ctx context.Context, queries QuerySlice, context *QueryContext) *BatchResult {
......
......@@ -14,28 +14,36 @@ import (
"golang.org/x/net/context/ctxhttp"
"github.com/grafana/grafana/pkg/log"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/tsdb"
)
type GraphiteExecutor struct {
*tsdb.DataSourceInfo
*models.DataSource
HttpClient *http.Client
}
func NewGraphiteExecutor(dsInfo *tsdb.DataSourceInfo) tsdb.Executor {
return &GraphiteExecutor{dsInfo}
func NewGraphiteExecutor(datasource *models.DataSource) (tsdb.Executor, error) {
httpClient, err := datasource.GetHttpClient()
if err != nil {
return nil, err
}
return &GraphiteExecutor{
DataSource: datasource,
HttpClient: httpClient,
}, nil
}
var (
glog log.Logger
HttpClient *http.Client
)
func init() {
glog = log.New("tsdb.graphite")
tsdb.RegisterExecutor("graphite", NewGraphiteExecutor)
HttpClient = tsdb.GetDefaultClient()
}
func (e *GraphiteExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, context *tsdb.QueryContext) *tsdb.BatchResult {
......@@ -66,7 +74,7 @@ func (e *GraphiteExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice,
return result
}
res, err := ctxhttp.Do(ctx, HttpClient, req)
res, err := ctxhttp.Do(ctx, e.HttpClient, req)
if err != nil {
result.Error = err
return result
......
package tsdb
import (
"crypto/tls"
"net"
"net/http"
"time"
)
func GetDefaultClient() *http.Client {
tr := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
return &http.Client{
Timeout: time.Duration(30 * time.Second),
Transport: tr,
}
}
......@@ -11,34 +11,40 @@ import (
"golang.org/x/net/context/ctxhttp"
"github.com/grafana/grafana/pkg/log"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/tsdb"
)
type InfluxDBExecutor struct {
*tsdb.DataSourceInfo
*models.DataSource
QueryParser *InfluxdbQueryParser
ResponseParser *ResponseParser
HttpClient *http.Client
}
func NewInfluxDBExecutor(dsInfo *tsdb.DataSourceInfo) tsdb.Executor {
func NewInfluxDBExecutor(datasource *models.DataSource) (tsdb.Executor, error) {
httpClient, err := datasource.GetHttpClient()
if err != nil {
return nil, err
}
return &InfluxDBExecutor{
DataSourceInfo: dsInfo,
DataSource: datasource,
QueryParser: &InfluxdbQueryParser{},
ResponseParser: &ResponseParser{},
}
HttpClient: httpClient,
}, nil
}
var (
glog log.Logger
HttpClient *http.Client
)
func init() {
glog = log.New("tsdb.influxdb")
tsdb.RegisterExecutor("influxdb", NewInfluxDBExecutor)
HttpClient = tsdb.GetDefaultClient()
}
func (e *InfluxDBExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, context *tsdb.QueryContext) *tsdb.BatchResult {
......@@ -63,7 +69,7 @@ func (e *InfluxDBExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice,
return result.WithError(err)
}
resp, err := ctxhttp.Do(ctx, HttpClient, req)
resp, err := ctxhttp.Do(ctx, e.HttpClient, req)
if err != nil {
return result.WithError(err)
}
......@@ -95,7 +101,7 @@ func (e *InfluxDBExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice,
func (e *InfluxDBExecutor) getQuery(queries tsdb.QuerySlice, context *tsdb.QueryContext) (*Query, error) {
for _, v := range queries {
query, err := e.QueryParser.Parse(v.Model, e.DataSourceInfo)
query, err := e.QueryParser.Parse(v.Model, e.DataSource)
if err != nil {
return nil, err
}
......
......@@ -4,12 +4,12 @@ import (
"strconv"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/tsdb"
"github.com/grafana/grafana/pkg/models"
)
type InfluxdbQueryParser struct{}
func (qp *InfluxdbQueryParser) Parse(model *simplejson.Json, dsInfo *tsdb.DataSourceInfo) (*Query, error) {
func (qp *InfluxdbQueryParser) Parse(model *simplejson.Json, dsInfo *models.DataSource) (*Query, error) {
policy := model.Get("policy").MustString("default")
rawQuery := model.Get("query").MustString("")
useRawQuery := model.Get("rawQuery").MustBool(false)
......
......@@ -4,7 +4,7 @@ import (
"testing"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/tsdb"
"github.com/grafana/grafana/pkg/models"
. "github.com/smartystreets/goconvey/convey"
)
......@@ -12,7 +12,7 @@ func TestInfluxdbQueryParser(t *testing.T) {
Convey("Influxdb query parser", t, func() {
parser := &InfluxdbQueryParser{}
dsInfo := &tsdb.DataSourceInfo{
dsInfo := &models.DataSource{
JsonData: simplejson.New(),
}
......
......@@ -2,6 +2,7 @@ package tsdb
import (
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/models"
"gopkg.in/guregu/null.v3"
)
......@@ -9,7 +10,7 @@ type Query struct {
RefId string
Model *simplejson.Json
Depends []string
DataSource *DataSourceInfo
DataSource *models.DataSource
Results []*TimeSeries
Exclude bool
MaxDataPoints int64
......@@ -28,20 +29,6 @@ type Response struct {
Results map[string]*QueryResult `json:"results"`
}
type DataSourceInfo struct {
Id int64
Name string
PluginId string
Url string
Password string
User string
Database string
BasicAuth bool
BasicAuthUser string
BasicAuthPassword string
JsonData *simplejson.Json
}
type BatchTiming struct {
TimeElapsed int64
}
......
......@@ -17,28 +17,36 @@ import (
"gopkg.in/guregu/null.v3"
"github.com/grafana/grafana/pkg/log"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/tsdb"
)
type OpenTsdbExecutor struct {
*tsdb.DataSourceInfo
*models.DataSource
httpClient *http.Client
}
func NewOpenTsdbExecutor(dsInfo *tsdb.DataSourceInfo) tsdb.Executor {
return &OpenTsdbExecutor{dsInfo}
func NewOpenTsdbExecutor(datasource *models.DataSource) (tsdb.Executor, error) {
httpClient, err := datasource.GetHttpClient()
if err != nil {
return nil, err
}
return &OpenTsdbExecutor{
DataSource: datasource,
httpClient: httpClient,
}, nil
}
var (
plog log.Logger
HttpClient *http.Client
)
func init() {
plog = log.New("tsdb.opentsdb")
tsdb.RegisterExecutor("opentsdb", NewOpenTsdbExecutor)
HttpClient = tsdb.GetDefaultClient()
}
func (e *OpenTsdbExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, queryContext *tsdb.QueryContext) *tsdb.BatchResult {
......@@ -64,7 +72,7 @@ func (e *OpenTsdbExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice,
return result
}
res, err := ctxhttp.Do(ctx, HttpClient, req)
res, err := ctxhttp.Do(ctx, e.httpClient, req)
if err != nil {
result.Error = err
return result
......
......@@ -9,18 +9,30 @@ import (
"gopkg.in/guregu/null.v3"
"net/http"
"github.com/grafana/grafana/pkg/log"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/tsdb"
"github.com/prometheus/client_golang/api/prometheus"
pmodel "github.com/prometheus/common/model"
)
type PrometheusExecutor struct {
*tsdb.DataSourceInfo
*models.DataSource
Transport *http.Transport
}
func NewPrometheusExecutor(dsInfo *tsdb.DataSourceInfo) tsdb.Executor {
return &PrometheusExecutor{dsInfo}
func NewPrometheusExecutor(dsInfo *models.DataSource) (tsdb.Executor, error) {
transport, err := dsInfo.GetHttpTransport()
if err != nil {
return nil, err
}
return &PrometheusExecutor{
DataSource: dsInfo,
Transport: transport,
}, nil
}
var (
......@@ -36,7 +48,8 @@ func init() {
func (e *PrometheusExecutor) getClient() (prometheus.QueryAPI, error) {
cfg := prometheus.Config{
Address: e.DataSourceInfo.Url,
Address: e.DataSource.Url,
Transport: e.Transport,
}
client, err := prometheus.New(cfg)
......
......@@ -4,19 +4,20 @@ import (
"context"
"github.com/grafana/grafana/pkg/log"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/tsdb"
)
type TestDataExecutor struct {
*tsdb.DataSourceInfo
*models.DataSource
log log.Logger
}
func NewTestDataExecutor(dsInfo *tsdb.DataSourceInfo) tsdb.Executor {
func NewTestDataExecutor(dsInfo *models.DataSource) (tsdb.Executor, error) {
return &TestDataExecutor{
DataSourceInfo: dsInfo,
DataSource: dsInfo,
log: log.New("tsdb.testdata"),
}
}, nil
}
func init() {
......
......@@ -5,6 +5,7 @@ import (
"testing"
"time"
"github.com/grafana/grafana/pkg/models"
. "github.com/smartystreets/goconvey/convey"
)
......@@ -15,9 +16,9 @@ func TestMetricQuery(t *testing.T) {
Convey("Given 3 queries for 2 data sources", func() {
request := &Request{
Queries: QuerySlice{
{RefId: "A", DataSource: &DataSourceInfo{Id: 1}},
{RefId: "B", DataSource: &DataSourceInfo{Id: 1}},
{RefId: "C", DataSource: &DataSourceInfo{Id: 2}},
{RefId: "A", DataSource: &models.DataSource{Id: 1}},
{RefId: "B", DataSource: &models.DataSource{Id: 1}},
{RefId: "C", DataSource: &models.DataSource{Id: 2}},
},
}
......@@ -32,9 +33,9 @@ func TestMetricQuery(t *testing.T) {
Convey("Given query 2 depends on query 1", func() {
request := &Request{
Queries: QuerySlice{
{RefId: "A", DataSource: &DataSourceInfo{Id: 1}},
{RefId: "B", DataSource: &DataSourceInfo{Id: 2}},
{RefId: "C", DataSource: &DataSourceInfo{Id: 3}, Depends: []string{"A", "B"}},
{RefId: "A", DataSource: &models.DataSource{Id: 1}},
{RefId: "B", DataSource: &models.DataSource{Id: 2}},
{RefId: "C", DataSource: &models.DataSource{Id: 3}, Depends: []string{"A", "B"}},
},
}
......@@ -56,7 +57,7 @@ func TestMetricQuery(t *testing.T) {
Convey("When executing request with one query", t, func() {
req := &Request{
Queries: QuerySlice{
{RefId: "A", DataSource: &DataSourceInfo{Id: 1, PluginId: "test"}},
{RefId: "A", DataSource: &models.DataSource{Id: 1, Type: "test"}},
},
}
......@@ -75,8 +76,8 @@ func TestMetricQuery(t *testing.T) {
Convey("When executing one request with two queries from same data source", t, func() {
req := &Request{
Queries: QuerySlice{
{RefId: "A", DataSource: &DataSourceInfo{Id: 1, PluginId: "test"}},
{RefId: "B", DataSource: &DataSourceInfo{Id: 1, PluginId: "test"}},
{RefId: "A", DataSource: &models.DataSource{Id: 1, Type: "test"}},
{RefId: "B", DataSource: &models.DataSource{Id: 1, Type: "test"}},
},
}
......@@ -101,9 +102,9 @@ func TestMetricQuery(t *testing.T) {
Convey("When executing one request with three queries from different datasources", t, func() {
req := &Request{
Queries: QuerySlice{
{RefId: "A", DataSource: &DataSourceInfo{Id: 1, PluginId: "test"}},
{RefId: "B", DataSource: &DataSourceInfo{Id: 1, PluginId: "test"}},
{RefId: "C", DataSource: &DataSourceInfo{Id: 2, PluginId: "test"}},
{RefId: "A", DataSource: &models.DataSource{Id: 1, Type: "test"}},
{RefId: "B", DataSource: &models.DataSource{Id: 1, Type: "test"}},
{RefId: "C", DataSource: &models.DataSource{Id: 2, Type: "test"}},
},
}
......@@ -118,7 +119,7 @@ func TestMetricQuery(t *testing.T) {
Convey("When query uses data source of unknown type", t, func() {
req := &Request{
Queries: QuerySlice{
{RefId: "A", DataSource: &DataSourceInfo{Id: 1, PluginId: "asdasdas"}},
{RefId: "A", DataSource: &models.DataSource{Id: 1, Type: "asdasdas"}},
},
}
......@@ -130,10 +131,10 @@ func TestMetricQuery(t *testing.T) {
req := &Request{
Queries: QuerySlice{
{
RefId: "A", DataSource: &DataSourceInfo{Id: 1, PluginId: "test"},
RefId: "A", DataSource: &models.DataSource{Id: 1, Type: "test"},
},
{
RefId: "B", DataSource: &DataSourceInfo{Id: 2, PluginId: "test"}, Depends: []string{"A"},
RefId: "B", DataSource: &models.DataSource{Id: 2, Type: "test"}, Depends: []string{"A"},
},
},
}
......@@ -167,9 +168,9 @@ func TestMetricQuery(t *testing.T) {
}
func registerFakeExecutor() *FakeExecutor {
executor := NewFakeExecutor(nil)
RegisterExecutor("test", func(dsInfo *DataSourceInfo) Executor {
return executor
executor, _ := NewFakeExecutor(nil)
RegisterExecutor("test", func(dsInfo *models.DataSource) (Executor, error) {
return executor, nil
})
return executor
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment