Commit 0390b560 by Marcus Efraimsson Committed by GitHub

Backend plugins: Implement support for resources (#21805)

Implements initial support for resources using v0.14.0 of SDK.

Ref #21512
parent 53458681
......@@ -32,7 +32,7 @@ require (
github.com/gorilla/websocket v1.4.1
github.com/gosimple/slug v1.4.2
github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4
github.com/grafana/grafana-plugin-sdk-go v0.11.0
github.com/grafana/grafana-plugin-sdk-go v0.14.0
github.com/hashicorp/go-hclog v0.8.0
github.com/hashicorp/go-plugin v1.0.1
github.com/hashicorp/go-version v1.1.0
......
......@@ -9,8 +9,6 @@ github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuy
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/apache/arrow/go/arrow v0.0.0-20190716210558-5f564424c71c h1:iHUHzx3S1TU5xt+D7vLb0PAk3e+RfayF9IhR6+hyO/k=
github.com/apache/arrow/go/arrow v0.0.0-20190716210558-5f564424c71c/go.mod h1:VTxUBvSJ3s3eHAg65PNgrsn5BtqCRPdmyXh6rAfdxN0=
github.com/apache/arrow/go/arrow v0.0.0-20191025121910-b789226ccb21 h1:xI+FQ/TsyD7jy22vP9wQBtJsPFRQTLZHfrr1IMLl0g0=
github.com/apache/arrow/go/arrow v0.0.0-20191025121910-b789226ccb21/go.mod h1:VTxUBvSJ3s3eHAg65PNgrsn5BtqCRPdmyXh6rAfdxN0=
github.com/aws/aws-sdk-go v1.25.48 h1:J82DYDGZHOKHdhx6hD24Tm30c2C3GchYGfN0mf9iKUk=
......@@ -127,10 +125,8 @@ github.com/gosimple/slug v1.4.2 h1:jDmprx3q/9Lfk4FkGZtvzDQ9Cj9eAmsjzeQGp24PeiQ=
github.com/gosimple/slug v1.4.2/go.mod h1:ER78kgg1Mv0NQGlXiDe57DpCyfbNywXXZ9mIorhxAf0=
github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4 h1:SPdxCL9BChFTlyi0Khv64vdCW4TMna8+sxL7+Chx+Ag=
github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4/go.mod h1:nc0XxBzjeGcrMltCDw269LoWF9S8ibhgxolCdA1R8To=
github.com/grafana/grafana-plugin-sdk-go v0.6.0 h1:rMMp84CdEKxI4i6SEpIUgsHgcKa1JdXDjLrDDw6QhXI=
github.com/grafana/grafana-plugin-sdk-go v0.6.0/go.mod h1:0eV4vNOYNZKOBbl23MieYiLa2y8M8S3D6ytLdggctqo=
github.com/grafana/grafana-plugin-sdk-go v0.11.0 h1:MkefJEWDMYk86eHUzCatTfK2/atAYNN1FncBqP5mx20=
github.com/grafana/grafana-plugin-sdk-go v0.11.0/go.mod h1:Dgc2ygLBJXTTIcfIgCGG6mxRcC/aZ36rKZ1tq5B5XmY=
github.com/grafana/grafana-plugin-sdk-go v0.14.0 h1:7Y34uhBkmkZ1ywYNfTXYXZihRgxgCf9WQbad4iqJ+t8=
github.com/grafana/grafana-plugin-sdk-go v0.14.0/go.mod h1:Dgc2ygLBJXTTIcfIgCGG6mxRcC/aZ36rKZ1tq5B5XmY=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/hashicorp/go-hclog v0.0.0-20180709165350-ff2cf002a8dd/go.mod h1:9bjs9uLqI8l75knNv3lV1kA55veR+WUPSiKIWcQHudI=
......
......@@ -251,6 +251,9 @@ func (hs *HTTPServer) registerRoutes() {
apiRoute.Get("/plugins", Wrap(hs.GetPluginList))
apiRoute.Get("/plugins/:pluginId/settings", Wrap(GetPluginSettingByID))
apiRoute.Get("/plugins/:pluginId/markdown/:name", Wrap(GetPluginMarkdown))
apiRoute.Get("/plugins/:pluginId/health", Wrap(hs.CheckHealth))
apiRoute.Any("/plugins/:pluginId/resources", Wrap(hs.CallResource))
apiRoute.Any("/plugins/:pluginId/resources/*", Wrap(hs.CallResource))
apiRoute.Group("/plugins", func(pluginRoute routing.RouteRegister) {
pluginRoute.Get("/:pluginId/dashboards/", Wrap(GetPluginDashboards))
......@@ -260,6 +263,8 @@ func (hs *HTTPServer) registerRoutes() {
apiRoute.Get("/frontend/settings/", hs.GetFrontendSettings)
apiRoute.Any("/datasources/proxy/:id/*", reqSignedIn, hs.ProxyDataSourceRequest)
apiRoute.Any("/datasources/proxy/:id", reqSignedIn, hs.ProxyDataSourceRequest)
apiRoute.Any("/datasources/:id/resources", Wrap(hs.CallDatasourceResource))
apiRoute.Any("/datasources/:id/resources/*", Wrap(hs.CallDatasourceResource))
// Folders
apiRoute.Group("/folders", func(folderRoute routing.RouteRegister) {
......
......@@ -7,6 +7,7 @@ import (
"github.com/grafana/grafana/pkg/bus"
m "github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/plugins/backendplugin"
"github.com/grafana/grafana/pkg/util"
)
......@@ -253,6 +254,60 @@ func GetDataSourceIdByName(c *m.ReqContext) Response {
return JSON(200, &dtos)
}
// /api/datasources/:id/resources/*
func (hs *HTTPServer) CallDatasourceResource(c *m.ReqContext) Response {
datasourceID := c.ParamsInt64(":id")
ds, err := hs.DatasourceCache.GetDatasource(datasourceID, c.SignedInUser, c.SkipCache)
if err != nil {
if err == m.ErrDataSourceAccessDenied {
return Error(403, "Access denied to datasource", err)
}
return Error(500, "Unable to load datasource meta data", err)
}
// find plugin
plugin, ok := plugins.DataSources[ds.Type]
if !ok {
return Error(500, "Unable to find datasource plugin", err)
}
body, err := c.Req.Body().Bytes()
if err != nil {
return Error(500, "Failed to read request body", err)
}
req := backendplugin.CallResourceRequest{
Config: backendplugin.PluginConfig{
OrgID: c.OrgId,
PluginID: plugin.Id,
Instance: &backendplugin.PluginInstance{
ID: ds.Id,
Name: ds.Name,
Type: ds.Type,
URL: ds.Url,
},
},
Path: c.Params("*"),
Method: c.Req.Method,
URL: c.Req.URL.String(),
Headers: c.Req.Header.Clone(),
Body: body,
}
resp, err := hs.BackendPluginManager.CallResource(c.Req.Context(), req)
if err != nil {
return Error(500, "Failed to call datasource resource", err)
}
if resp.Status >= 400 {
return Error(resp.Status, "", nil)
}
return &NormalResponse{
body: resp.Body,
status: resp.Status,
header: resp.Headers,
}
}
func convertModelToDtos(ds *m.DataSource) dtos.DataSource {
dto := dtos.DataSource{
Id: ds.Id,
......
......@@ -10,6 +10,8 @@ import (
"path"
"sync"
"github.com/grafana/grafana/pkg/plugins/backendplugin"
"github.com/grafana/grafana/pkg/api/live"
"github.com/grafana/grafana/pkg/api/routing"
httpstatic "github.com/grafana/grafana/pkg/api/static"
......@@ -57,19 +59,20 @@ type HTTPServer struct {
streamManager *live.StreamManager
httpSrv *http.Server
RouteRegister routing.RouteRegister `inject:""`
Bus bus.Bus `inject:""`
RenderService rendering.Service `inject:""`
Cfg *setting.Cfg `inject:""`
HooksService *hooks.HooksService `inject:""`
CacheService *localcache.CacheService `inject:""`
DatasourceCache datasources.CacheService `inject:""`
AuthTokenService models.UserTokenService `inject:""`
QuotaService *quota.QuotaService `inject:""`
RemoteCacheService *remotecache.RemoteCache `inject:""`
ProvisioningService ProvisioningService `inject:""`
Login *login.LoginService `inject:""`
License models.Licensing `inject:""`
RouteRegister routing.RouteRegister `inject:""`
Bus bus.Bus `inject:""`
RenderService rendering.Service `inject:""`
Cfg *setting.Cfg `inject:""`
HooksService *hooks.HooksService `inject:""`
CacheService *localcache.CacheService `inject:""`
DatasourceCache datasources.CacheService `inject:""`
AuthTokenService models.UserTokenService `inject:""`
QuotaService *quota.QuotaService `inject:""`
RemoteCacheService *remotecache.RemoteCache `inject:""`
ProvisioningService ProvisioningService `inject:""`
Login *login.LoginService `inject:""`
License models.Licensing `inject:""`
BackendPluginManager backendplugin.Manager `inject:""`
}
func (hs *HTTPServer) Init() error {
......
......@@ -3,6 +3,8 @@ package api
import (
"sort"
"github.com/grafana/grafana/pkg/plugins/backendplugin"
"github.com/grafana/grafana/pkg/api/dtos"
"github.com/grafana/grafana/pkg/bus"
m "github.com/grafana/grafana/pkg/models"
......@@ -200,3 +202,74 @@ func ImportDashboard(c *m.ReqContext, apiCmd dtos.ImportDashboardCommand) Respon
return JSON(200, cmd.Result)
}
// /api/plugins/:pluginId/health
func (hs *HTTPServer) CheckHealth(c *m.ReqContext) Response {
pluginID := c.Params("pluginId")
resp, err := hs.BackendPluginManager.CheckHealth(c.Req.Context(), pluginID)
if err != nil {
if err == backendplugin.ErrPluginNotRegistered {
return Error(404, "Plugin not found", err)
}
// Return status unknown instead?
if err == backendplugin.ErrDiagnosticsNotSupported {
return Error(404, "Health check not implemented", err)
}
// Return status unknown or error instead?
if err == backendplugin.ErrHealthCheckFailed {
return Error(500, "Plugin health check failed", err)
}
}
payload := map[string]interface{}{
"status": resp.Status.String(),
"info": resp.Info,
}
if resp.Status != backendplugin.HealthStatusOk {
return JSON(503, payload)
}
return JSON(200, payload)
}
// /api/plugins/:pluginId/resources/*
func (hs *HTTPServer) CallResource(c *m.ReqContext) Response {
pluginID := c.Params("pluginId")
_, exists := plugins.Plugins[pluginID]
if !exists {
return Error(404, "Plugin not found, no installed plugin with that id", nil)
}
body, err := c.Req.Body().Bytes()
if err != nil {
return Error(500, "Failed to read request body", err)
}
req := backendplugin.CallResourceRequest{
Config: backendplugin.PluginConfig{
OrgID: c.OrgId,
PluginID: pluginID,
},
Path: c.Params("*"),
Method: c.Req.Method,
URL: c.Req.URL.String(),
Headers: c.Req.Header.Clone(),
Body: body,
}
resp, err := hs.BackendPluginManager.CallResource(c.Req.Context(), req)
if err != nil {
return Error(500, "Failed to call resource", err)
}
if resp.Status >= 400 {
return Error(resp.Status, "", nil)
}
return &NormalResponse{
body: resp.Body,
status: resp.Status,
header: resp.Headers,
}
}
......@@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"net/http"
"github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2"
"github.com/prometheus/client_golang/prometheus"
......@@ -14,7 +15,6 @@ import (
datasourceV1 "github.com/grafana/grafana-plugin-model/go/datasource"
rendererV1 "github.com/grafana/grafana-plugin-model/go/renderer"
backend "github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/plugins/backendplugin/collector"
"github.com/grafana/grafana/pkg/util/errutil"
......@@ -31,7 +31,8 @@ type BackendPlugin struct {
client *plugin.Client
logger log.Logger
startFns PluginStartFuncs
diagnostics backend.DiagnosticsPlugin
diagnostics DiagnosticsPlugin
core CorePlugin
}
func (p *BackendPlugin) start(ctx context.Context) error {
......@@ -61,20 +62,21 @@ func (p *BackendPlugin) start(ctx context.Context) error {
}
if rawDiagnostics != nil {
if plugin, ok := rawDiagnostics.(backend.DiagnosticsPlugin); ok {
if plugin, ok := rawDiagnostics.(DiagnosticsPlugin); ok {
p.diagnostics = plugin
}
}
client = &Client{}
if rawBackend != nil {
if plugin, ok := rawBackend.(backend.BackendPlugin); ok {
client.BackendPlugin = plugin
if plugin, ok := rawBackend.(CorePlugin); ok {
p.core = plugin
client.DatasourcePlugin = plugin
}
}
if rawTransform != nil {
if plugin, ok := rawTransform.(backend.TransformPlugin); ok {
if plugin, ok := rawTransform.(TransformPlugin); ok {
client.TransformPlugin = plugin
}
}
......@@ -194,6 +196,47 @@ func (p *BackendPlugin) checkHealth(ctx context.Context) (*pluginv2.CheckHealth_
return res, nil
}
func (p *BackendPlugin) callResource(ctx context.Context, req CallResourceRequest) (*CallResourceResult, error) {
p.logger.Debug("Calling resource", "path", req.Path, "method", req.Method)
reqHeaders := map[string]*pluginv2.CallResource_StringList{}
for k, v := range req.Headers {
reqHeaders[k] = &pluginv2.CallResource_StringList{Values: v}
}
protoReq := &pluginv2.CallResource_Request{
Config: &pluginv2.PluginConfig{},
Path: req.Path,
Method: req.Method,
Url: req.URL,
Headers: reqHeaders,
Body: req.Body,
}
protoResp, err := p.core.CallResource(ctx, protoReq)
if err != nil {
if st, ok := status.FromError(err); ok {
if st.Code() == codes.Unimplemented {
return &CallResourceResult{
Status: http.StatusNotImplemented,
}, nil
}
}
return nil, errutil.Wrap("Failed to call resource", err)
}
respHeaders := map[string][]string{}
for key, values := range protoResp.Headers {
respHeaders[key] = values.Values
}
return &CallResourceResult{
Headers: respHeaders,
Body: protoResp.Body,
Status: int(protoResp.Code),
}, nil
}
// convertMetricFamily converts metric family to prometheus.Metric.
// Copied from https://github.com/prometheus/node_exporter/blob/3ddc82c2d8d11eec53ed5faa8db969a1bb81f8bb/collector/textfile.go#L66-L165
func convertMetricFamily(pluginID string, metricFamily *dto.MetricFamily, ch chan<- prometheus.Metric, logger log.Logger) {
......
package backendplugin
import (
"context"
"os/exec"
"github.com/grafana/grafana-plugin-sdk-go/backend/plugin"
"github.com/grafana/grafana/pkg/infra/log"
datasourceV1 "github.com/grafana/grafana-plugin-model/go/datasource"
rendererV1 "github.com/grafana/grafana-plugin-model/go/renderer"
backend "github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/hashicorp/go-plugin"
"github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2"
goplugin "github.com/hashicorp/go-plugin"
)
const (
......@@ -19,24 +22,24 @@ const (
)
// Handshake is the HandshakeConfig used to configure clients and servers.
var handshake = plugin.HandshakeConfig{
var handshake = goplugin.HandshakeConfig{
// The ProtocolVersion is the version that must match between Grafana core
// and Grafana plugins. This should be bumped whenever a (breaking) change
// happens in one or the other that makes it so that they can't safely communicate.
ProtocolVersion: DefaultProtocolVersion,
// The magic cookie values should NEVER be changed.
MagicCookieKey: backend.MagicCookieKey,
MagicCookieValue: backend.MagicCookieValue,
MagicCookieKey: plugin.MagicCookieKey,
MagicCookieValue: plugin.MagicCookieValue,
}
func newClientConfig(executablePath string, logger log.Logger, versionedPlugins map[int]plugin.PluginSet) *plugin.ClientConfig {
return &plugin.ClientConfig{
func newClientConfig(executablePath string, logger log.Logger, versionedPlugins map[int]goplugin.PluginSet) *goplugin.ClientConfig {
return &goplugin.ClientConfig{
Cmd: exec.Command(executablePath),
HandshakeConfig: handshake,
VersionedPlugins: versionedPlugins,
Logger: logWrapper{Logger: logger},
AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC},
AllowedProtocols: []goplugin.Protocol{goplugin.ProtocolGRPC},
}
}
......@@ -57,7 +60,7 @@ type PluginDescriptor struct {
pluginID string
executablePath string
managed bool
versionedPlugins map[int]plugin.PluginSet
versionedPlugins map[int]goplugin.PluginSet
startFns PluginStartFuncs
}
......@@ -68,14 +71,14 @@ func NewBackendPluginDescriptor(pluginID, executablePath string, startFns Plugin
pluginID: pluginID,
executablePath: executablePath,
managed: true,
versionedPlugins: map[int]plugin.PluginSet{
versionedPlugins: map[int]goplugin.PluginSet{
DefaultProtocolVersion: {
pluginID: &datasourceV1.DatasourcePluginImpl{},
},
backend.ProtocolVersion: {
"diagnostics": &backend.DiagnosticsGRPCPlugin{},
"backend": &backend.CoreGRPCPlugin{},
"transform": &backend.TransformGRPCPlugin{},
plugin.ProtocolVersion: {
"diagnostics": &plugin.DiagnosticsGRPCPlugin{},
"backend": &plugin.CoreGRPCPlugin{},
"transform": &plugin.TransformGRPCPlugin{},
},
},
startFns: startFns,
......@@ -89,7 +92,7 @@ func NewRendererPluginDescriptor(pluginID, executablePath string, startFns Plugi
pluginID: pluginID,
executablePath: executablePath,
managed: false,
versionedPlugins: map[int]plugin.PluginSet{
versionedPlugins: map[int]goplugin.PluginSet{
DefaultProtocolVersion: {
pluginID: &rendererV1.RendererPluginImpl{},
},
......@@ -98,6 +101,28 @@ func NewRendererPluginDescriptor(pluginID, executablePath string, startFns Plugi
}
}
type DiagnosticsPlugin interface {
CollectMetrics(ctx context.Context, req *pluginv2.CollectMetrics_Request) (*pluginv2.CollectMetrics_Response, error)
CheckHealth(ctx context.Context, req *pluginv2.CheckHealth_Request) (*pluginv2.CheckHealth_Response, error)
}
type DatasourcePlugin interface {
DataQuery(ctx context.Context, req *pluginv2.DataQueryRequest) (*pluginv2.DataQueryResponse, error)
}
type CorePlugin interface {
CallResource(ctx context.Context, req *pluginv2.CallResource_Request) (*pluginv2.CallResource_Response, error)
DatasourcePlugin
}
type TransformCallBack interface {
DataQuery(ctx context.Context, req *pluginv2.DataQueryRequest) (*pluginv2.DataQueryResponse, error)
}
type TransformPlugin interface {
DataQuery(ctx context.Context, req *pluginv2.DataQueryRequest, callback TransformCallBack) (*pluginv2.DataQueryResponse, error)
}
// LegacyClient client for communicating with a plugin using the old plugin protocol.
type LegacyClient struct {
DatasourcePlugin datasourceV1.DatasourcePlugin
......@@ -106,7 +131,6 @@ type LegacyClient struct {
// Client client for communicating with a plugin using the current plugin protocol.
type Client struct {
DiagnosticsPlugin backend.DiagnosticsPlugin
BackendPlugin backend.BackendPlugin
TransformPlugin backend.TransformPlugin
DatasourcePlugin DatasourcePlugin
TransformPlugin TransformPlugin
}
package backendplugin
import (
"encoding/json"
"strconv"
"github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2"
)
// HealthStatus is the status of the plugin.
type HealthStatus int
const (
// HealthStatusUnknown means the status of the plugin is unknown.
HealthStatusUnknown HealthStatus = iota
// HealthStatusOk means the status of the plugin is good.
HealthStatusOk
// HealthStatusError means the plugin is in an error state.
HealthStatusError
)
var healthStatusNames = map[int]string{
0: "UNKNOWN",
1: "OK",
2: "ERROR",
}
func (hs HealthStatus) String() string {
s, exists := healthStatusNames[int(hs)]
if exists {
return s
}
return strconv.Itoa(int(hs))
}
// CheckHealthResult check health result.
type CheckHealthResult struct {
Status HealthStatus
Info string
}
func checkHealthResultFromProto(protoResp *pluginv2.CheckHealth_Response) *CheckHealthResult {
status := HealthStatusUnknown
switch protoResp.Status {
case pluginv2.CheckHealth_Response_ERROR:
status = HealthStatusError
case pluginv2.CheckHealth_Response_OK:
status = HealthStatusOk
}
return &CheckHealthResult{
Status: status,
Info: protoResp.Info,
}
}
type PluginInstance struct {
ID int64
Name string
Type string
URL string
JSONData json.RawMessage
}
type PluginConfig struct {
PluginID string
OrgID int64
Instance *PluginInstance
}
type CallResourceRequest struct {
Config PluginConfig
Path string
Method string
URL string
Headers map[string][]string
Body []byte
}
// CallResourceResult call resource result.
type CallResourceResult struct {
Status int
Headers map[string][]string
Body []byte
}
......@@ -6,19 +6,24 @@ import (
"sync"
"time"
"github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2"
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/grafana/pkg/api/routing"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/plugins/backendplugin/collector"
"github.com/grafana/grafana/pkg/registry"
plugin "github.com/hashicorp/go-plugin"
"golang.org/x/xerrors"
)
var (
// ErrPluginNotRegistered error returned when plugin not registered.
ErrPluginNotRegistered = errors.New("Plugin not registered")
// ErrDiagnosticsNotSupported error returned when plugin doesn't support diagnostics.
ErrDiagnosticsNotSupported = errors.New("Plugin diagnostics not supported")
// ErrHealthCheckFailed error returned when health check failed.
ErrHealthCheckFailed = errors.New("Health check failed")
)
func init() {
registry.Register(&registry.Descriptor{
Name: "BackendPluginManager",
......@@ -33,10 +38,13 @@ type Manager interface {
Register(descriptor PluginDescriptor) error
// StartPlugin starts a non-managed backend plugin
StartPlugin(ctx context.Context, pluginID string) error
// CheckHealth checks the health of a registered backend plugin.
CheckHealth(ctx context.Context, pluginID string) (*CheckHealthResult, error)
// CallResource calls a plugin resource.
CallResource(ctx context.Context, req CallResourceRequest) (*CallResourceResult, error)
}
type manager struct {
RouteRegister routing.RouteRegister `inject:""`
pluginsMu sync.RWMutex
plugins map[string]*BackendPlugin
pluginCollector collector.PluginCollector
......@@ -49,8 +57,6 @@ func (m *manager) Init() error {
m.pluginCollector = collector.NewPluginCollector()
prometheus.MustRegister(m.pluginCollector)
m.RouteRegister.Get("/api/plugins/:pluginId/health", m.checkHealth)
return nil
}
......@@ -140,38 +146,50 @@ func (m *manager) stop() {
}
}
// checkHealth http handler for checking plugin health.
func (m *manager) checkHealth(c *models.ReqContext) {
pluginID := c.Params("pluginId")
// CheckHealth checks the health of a registered backend plugin.
func (m *manager) CheckHealth(ctx context.Context, pluginID string) (*CheckHealthResult, error) {
m.pluginsMu.RLock()
p, registered := m.plugins[pluginID]
m.pluginsMu.RUnlock()
if !registered || !p.supportsDiagnostics() {
c.JsonApiErr(404, "Plugin not found", nil)
return
if !registered {
return nil, ErrPluginNotRegistered
}
if !p.supportsDiagnostics() {
return nil, ErrDiagnosticsNotSupported
}
res, err := p.checkHealth(c.Req.Context())
res, err := p.checkHealth(ctx)
if err != nil {
p.logger.Error("Failed to check plugin health", "error", err)
c.JSON(503, map[string]interface{}{
"status": pluginv2.CheckHealth_Response_ERROR.String(),
})
return
return nil, ErrHealthCheckFailed
}
return checkHealthResultFromProto(res), nil
}
// CallResource calls a plugin resource.
func (m *manager) CallResource(ctx context.Context, req CallResourceRequest) (*CallResourceResult, error) {
m.pluginsMu.RLock()
p, registered := m.plugins[req.Config.PluginID]
m.pluginsMu.RUnlock()
if !registered {
return nil, ErrPluginNotRegistered
}
payload := map[string]interface{}{
"status": res.Status.String(),
"info": res.Info,
res, err := p.callResource(ctx, req)
if err != nil {
return nil, err
}
if res.Status != pluginv2.CheckHealth_Response_OK {
c.JSON(503, payload)
return
// Make sure a content type always is returned in response
if _, exists := res.Headers["Content-Type"]; !exists {
res.Headers["Content-Type"] = []string{"application/json"}
}
c.JSON(200, payload)
return res, nil
}
func startPluginAndRestartKilledProcesses(ctx context.Context, p *BackendPlugin) error {
......
package resource
import (
"bytes"
"net/http"
"github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2"
)
// ResourceResponseWriter is an implementation of http.ResponseWriter that
// records its mutations for later inspection in tests.
type ResourceResponseWriter struct {
// Code is the HTTP response code set by WriteHeader.
//
// Note that if a Handler never calls WriteHeader or Write,
// this might end up being 0, rather than the implicit
// http.StatusOK. To get the implicit value, use the Result
// method.
Code int
// HeaderMap contains the headers explicitly set by the Handler.
// It is an internal detail.
//
// Deprecated: HeaderMap exists for historical compatibility
// and should not be used. To access the headers returned by a handler,
// use the Response.Header map as returned by the Result method.
HeaderMap http.Header
// Body is the buffer to which the Handler's Write calls are sent.
// If nil, the Writes are silently discarded.
Body *bytes.Buffer
// Flushed is whether the Handler called Flush.
Flushed bool
wroteHeader bool
}
// NewResourceResponseWriter returns an initialized ResponseWriter.
func NewResourceResponseWriter() *ResourceResponseWriter {
return &ResourceResponseWriter{
HeaderMap: make(http.Header),
Body: new(bytes.Buffer),
Code: 200,
}
}
// Header implements http.ResponseWriter. It returns the response
// headers to mutate within a handler. To test the headers that were
// written after a handler completes, use the Result method and see
// the returned Response value's Header.
func (rw *ResourceResponseWriter) Header() http.Header {
m := rw.HeaderMap
if m == nil {
m = make(http.Header)
rw.HeaderMap = m
}
return m
}
// writeHeader writes a header if it was not written yet and
// detects Content-Type if needed.
//
// bytes or str are the beginning of the response body.
// We pass both to avoid unnecessarily generate garbage
// in rw.WriteString which was created for performance reasons.
// Non-nil bytes win.
func (rw *ResourceResponseWriter) writeHeader(b []byte, str string) {
if rw.wroteHeader {
return
}
if len(str) > 512 {
str = str[:512]
}
m := rw.Header()
_, hasType := m["Content-Type"]
hasTE := m.Get("Transfer-Encoding") != ""
if !hasType && !hasTE {
if b == nil {
b = []byte(str)
}
m.Set("Content-Type", http.DetectContentType(b))
}
rw.WriteHeader(200)
}
// Write implements http.ResponseWriter. The data in buf is written to
// rw.Body, if not nil.
func (rw *ResourceResponseWriter) Write(buf []byte) (int, error) {
rw.writeHeader(buf, "")
if rw.Body != nil {
rw.Body.Write(buf)
}
return len(buf), nil
}
// WriteHeader implements http.ResponseWriter.
func (rw *ResourceResponseWriter) WriteHeader(code int) {
if rw.wroteHeader {
return
}
rw.Code = code
rw.wroteHeader = true
if rw.HeaderMap == nil {
rw.HeaderMap = make(http.Header)
}
}
// Flush implements http.Flusher.
func (rw *ResourceResponseWriter) Flush() {
if !rw.wroteHeader {
rw.WriteHeader(200)
}
}
// Result returns the response generated by the handler.
func (rw *ResourceResponseWriter) Result() *pluginv2.CallResource_Response {
res := &pluginv2.CallResource_Response{
Code: int32(rw.Code),
Headers: map[string]*pluginv2.CallResource_StringList{},
}
if rw.Body != nil {
res.Body = rw.Body.Bytes()
}
for key, values := range rw.Header().Clone() {
res.Headers[key] = &pluginv2.CallResource_StringList{
Values: values,
}
}
return res
}
......@@ -3,7 +3,8 @@ package wrapper
import (
"context"
sdk "github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/plugins/backendplugin"
"github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/infra/log"
......@@ -11,12 +12,12 @@ import (
"github.com/grafana/grafana/pkg/tsdb"
)
func NewDatasourcePluginWrapperV2(log log.Logger, plugin sdk.BackendPlugin) *DatasourcePluginWrapperV2 {
return &DatasourcePluginWrapperV2{BackendPlugin: plugin, logger: log}
func NewDatasourcePluginWrapperV2(log log.Logger, plugin backendplugin.DatasourcePlugin) *DatasourcePluginWrapperV2 {
return &DatasourcePluginWrapperV2{DatasourcePlugin: plugin, logger: log}
}
type DatasourcePluginWrapperV2 struct {
sdk.BackendPlugin
backendplugin.DatasourcePlugin
logger log.Logger
}
......@@ -56,7 +57,7 @@ func (tw *DatasourcePluginWrapperV2) Query(ctx context.Context, ds *models.DataS
})
}
pbRes, err := tw.BackendPlugin.DataQuery(ctx, pbQuery)
pbRes, err := tw.DatasourcePlugin.DataQuery(ctx, pbQuery)
if err != nil {
return nil, err
}
......
......@@ -68,9 +68,9 @@ func (p *DataSourcePlugin) onLegacyPluginStart(pluginID string, client *backendp
}
func (p *DataSourcePlugin) onPluginStart(pluginID string, client *backendplugin.Client, logger log.Logger) error {
if client.BackendPlugin != nil {
if client.DatasourcePlugin != nil {
tsdb.RegisterTsdbQueryEndpoint(pluginID, func(dsInfo *models.DataSource) (tsdb.TsdbQueryEndpoint, error) {
return wrapper.NewDatasourcePluginWrapperV2(logger, client.BackendPlugin), nil
return wrapper.NewDatasourcePluginWrapperV2(logger, client.DatasourcePlugin), nil
})
}
......
......@@ -7,7 +7,6 @@ import (
"path"
"strconv"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/dataframe"
"github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2"
"github.com/grafana/grafana/pkg/bus"
......@@ -54,9 +53,9 @@ func (p *TransformPlugin) Load(decoder *json.Decoder, pluginDir string, backendP
func (p *TransformPlugin) onPluginStart(pluginID string, client *backendplugin.Client, logger log.Logger) error {
p.TransformWrapper = NewTransformWrapper(logger, client.TransformPlugin)
if client.BackendPlugin != nil {
if client.DatasourcePlugin != nil {
tsdb.RegisterTsdbQueryEndpoint(pluginID, func(dsInfo *models.DataSource) (tsdb.TsdbQueryEndpoint, error) {
return wrapper.NewDatasourcePluginWrapperV2(logger, client.BackendPlugin), nil
return wrapper.NewDatasourcePluginWrapperV2(logger, client.DatasourcePlugin), nil
})
}
......@@ -67,12 +66,12 @@ func (p *TransformPlugin) onPluginStart(pluginID string, client *backendplugin.C
// Wrapper Code
// ...
func NewTransformWrapper(log log.Logger, plugin backend.TransformPlugin) *TransformWrapper {
func NewTransformWrapper(log log.Logger, plugin backendplugin.TransformPlugin) *TransformWrapper {
return &TransformWrapper{plugin, log, &transformCallback{log}}
}
type TransformWrapper struct {
backend.TransformPlugin
backendplugin.TransformPlugin
logger log.Logger
callback *transformCallback
}
......
package backend
import (
"context"
"encoding/json"
"time"
"github.com/grafana/grafana-plugin-sdk-go/dataframe"
"github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2"
)
// PluginConfig holds configuration for the queried plugin.
type PluginConfig struct {
ID int64
OrgID int64
Name string
Type string
URL string
JSONData json.RawMessage
}
// PluginConfigFromProto converts the generated protobuf PluginConfig to this
// package's PluginConfig.
func pluginConfigFromProto(pc *pluginv2.PluginConfig) PluginConfig {
return PluginConfig{
ID: pc.Id,
OrgID: pc.OrgId,
Name: pc.Name,
Type: pc.Type,
URL: pc.Url,
JSONData: json.RawMessage([]byte(pc.JsonData)),
}
}
func (pc PluginConfig) toProtobuf() *pluginv2.PluginConfig {
return &pluginv2.PluginConfig{
Id: pc.ID,
OrgId: pc.OrgID,
Name: pc.Name,
Type: pc.Type,
Url: pc.URL,
JsonData: string(pc.JSONData),
}
}
type DataQueryRequest struct {
PluginConfig PluginConfig
Headers map[string]string
Queries []DataQuery
}
func dataQueryRequestFromProto(pc *pluginv2.DataQueryRequest) *DataQueryRequest {
queries := make([]DataQuery, len(pc.Queries))
for i, q := range pc.Queries {
queries[i] = *dataQueryFromProtobuf(q)
}
return &DataQueryRequest{
PluginConfig: pluginConfigFromProto(pc.Config),
Headers: pc.Headers,
Queries: queries,
}
}
func (dr *DataQueryRequest) toProtobuf() *pluginv2.DataQueryRequest {
queries := make([]*pluginv2.DataQuery, len(dr.Queries))
for i, q := range dr.Queries {
queries[i] = q.toProtobuf()
}
return &pluginv2.DataQueryRequest{
Config: dr.PluginConfig.toProtobuf(),
Headers: dr.Headers,
Queries: queries,
}
}
// DataQuery represents the query as sent from the frontend.
type DataQuery struct {
RefID string
MaxDataPoints int64
Interval time.Duration
TimeRange TimeRange
JSON json.RawMessage
}
func (q *DataQuery) toProtobuf() *pluginv2.DataQuery {
return &pluginv2.DataQuery{
RefId: q.RefID,
MaxDataPoints: q.MaxDataPoints,
IntervalMS: q.Interval.Microseconds(),
TimeRange: q.TimeRange.toProtobuf(),
Json: q.JSON,
}
}
func dataQueryFromProtobuf(q *pluginv2.DataQuery) *DataQuery {
return &DataQuery{
RefID: q.RefId,
MaxDataPoints: q.MaxDataPoints,
TimeRange: timeRangeFromProtobuf(q.TimeRange),
Interval: time.Duration(q.IntervalMS) * time.Millisecond,
JSON: []byte(q.Json),
}
}
// DataQueryResponse holds the results for a given query.
type DataQueryResponse struct {
Frames []*dataframe.Frame
Metadata map[string]string
}
func (res *DataQueryResponse) toProtobuf() (*pluginv2.DataQueryResponse, error) {
encodedFrames := make([][]byte, len(res.Frames))
var err error
for i, frame := range res.Frames {
encodedFrames[i], err = dataframe.MarshalArrow(frame)
if err != nil {
return nil, err
}
}
return &pluginv2.DataQueryResponse{
Frames: encodedFrames,
Metadata: res.Metadata,
}, nil
}
func dataQueryResponseFromProtobuf(res *pluginv2.DataQueryResponse) (*DataQueryResponse, error) {
frames := make([]*dataframe.Frame, len(res.Frames))
var err error
for i, encodedFrame := range res.Frames {
frames[i], err = dataframe.UnmarshalArrow(encodedFrame)
if err != nil {
return nil, err
}
}
return &DataQueryResponse{Metadata: res.Metadata, Frames: frames}, nil
}
// TimeRange represents a time range for a query.
type TimeRange struct {
From time.Time
To time.Time
}
func (tr *TimeRange) toProtobuf() *pluginv2.TimeRange {
return &pluginv2.TimeRange{
FromEpochMS: tr.From.UnixNano() / int64(time.Millisecond),
ToEpochMS: tr.To.UnixNano() / int64(time.Millisecond),
}
}
// TimeRangeFromProtobuf converts the generated protobuf TimeRange to this
// package's FetchInfo.
func timeRangeFromProtobuf(tr *pluginv2.TimeRange) TimeRange {
return TimeRange{
From: time.Unix(0, tr.FromEpochMS*int64(time.Millisecond)),
To: time.Unix(0, tr.ToEpochMS*int64(time.Millisecond)),
}
}
type ResourceRequest struct {
PluginConfig PluginConfig
Headers map[string]string
Method string
Path string
Body []byte
}
func resourceRequestFromProtobuf(req *pluginv2.ResourceRequest) *ResourceRequest {
return &ResourceRequest{
PluginConfig: pluginConfigFromProto(req.Config),
Headers: req.Headers,
Method: req.Method,
Path: req.Path,
Body: req.Body,
}
}
type ResourceResponse struct {
Headers map[string]string
Code int32
Body []byte
}
func (rr *ResourceResponse) toProtobuf() *pluginv2.ResourceResponse {
return &pluginv2.ResourceResponse{
Headers: rr.Headers,
Code: rr.Code,
Body: rr.Body,
}
}
// DataQueryHandler handles data source queries.
type DataQueryHandler interface {
DataQuery(ctx context.Context, req *DataQueryRequest) (*DataQueryResponse, error)
}
// ResourceHandler handles backend plugin checks.
type ResourceHandler interface {
Resource(ctx context.Context, req *ResourceRequest) (*ResourceResponse, error)
}
// PluginHandlers is the collection of handlers that corresponds to the
// grpc "service BackendPlugin".
type PluginHandlers interface {
DataQueryHandler
ResourceHandler
}
// BackendPlugin is the Grafana backend plugin interface.
type BackendPlugin interface {
DataQuery(ctx context.Context, req *pluginv2.DataQueryRequest) (*pluginv2.DataQueryResponse, error)
Resource(ctx context.Context, req *pluginv2.ResourceRequest) (*pluginv2.ResourceResponse, error)
}
package backend
import (
"context"
"github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2"
)
// DiagnosticsPlugin is the Grafana diagnostics plugin interface.
type DiagnosticsPlugin interface {
CollectMetrics(ctx context.Context, req *pluginv2.CollectMetrics_Request) (*pluginv2.CollectMetrics_Response, error)
CheckHealth(ctx context.Context, req *pluginv2.CheckHealth_Request) (*pluginv2.CheckHealth_Response, error)
}
type CheckHealthHandler interface {
CheckHealth(ctx context.Context) (*CheckHealthResult, error)
}
// HealthStatus is the status of the plugin.
type HealthStatus int
const (
// HealthStatusUnknown means the status of the plugin is unknown.
HealthStatusUnknown HealthStatus = iota
// HealthStatusOk means the status of the plugin is good.
HealthStatusOk
// HealthStatusError means the plugin is in an error state.
HealthStatusError
)
func (ps HealthStatus) toProtobuf() pluginv2.CheckHealth_Response_HealthStatus {
switch ps {
case HealthStatusUnknown:
return pluginv2.CheckHealth_Response_UNKNOWN
case HealthStatusOk:
return pluginv2.CheckHealth_Response_OK
case HealthStatusError:
return pluginv2.CheckHealth_Response_ERROR
}
panic("unsupported protobuf health status type in sdk")
}
type CheckHealthResult struct {
Status HealthStatus
Info string
}
func (res *CheckHealthResult) toProtobuf() *pluginv2.CheckHealth_Response {
return &pluginv2.CheckHealth_Response{
Status: res.Status.toProtobuf(),
Info: res.Info,
}
}
package backend
import plugin "github.com/hashicorp/go-plugin"
const (
MagicCookieKey = "grafana_plugin_type"
MagicCookieValue = "datasource"
ProtocolVersion = 2
)
var Handshake = plugin.HandshakeConfig{
ProtocolVersion: ProtocolVersion,
MagicCookieKey: MagicCookieKey,
MagicCookieValue: MagicCookieValue,
}
package backend
package plugin
import (
"context"
......@@ -8,16 +8,20 @@ import (
"google.golang.org/grpc"
)
type CoreServer interface {
pluginv2.CoreServer
}
// CoreGRPCPlugin implements the GRPCPlugin interface from github.com/hashicorp/go-plugin.
type CoreGRPCPlugin struct {
plugin.NetRPCUnsupportedPlugin
plugin.GRPCPlugin
server pluginv2.CoreServer
CoreServer CoreServer
}
func (p *CoreGRPCPlugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server) error {
pluginv2.RegisterCoreServer(s, &coreGRPCServer{
server: p.server,
server: p.CoreServer,
})
return nil
}
......@@ -27,15 +31,15 @@ func (p *CoreGRPCPlugin) GRPCClient(ctx context.Context, broker *plugin.GRPCBrok
}
type coreGRPCServer struct {
server pluginv2.CoreServer
server CoreServer
}
func (s *coreGRPCServer) DataQuery(ctx context.Context, req *pluginv2.DataQueryRequest) (*pluginv2.DataQueryResponse, error) {
return s.server.DataQuery(ctx, req)
}
func (s *coreGRPCServer) Resource(ctx context.Context, req *pluginv2.ResourceRequest) (*pluginv2.ResourceResponse, error) {
return s.server.Resource(ctx, req)
func (s *coreGRPCServer) CallResource(ctx context.Context, req *pluginv2.CallResource_Request) (*pluginv2.CallResource_Response, error) {
return s.server.CallResource(ctx, req)
}
type coreGRPCClient struct {
......@@ -46,6 +50,9 @@ func (m *coreGRPCClient) DataQuery(ctx context.Context, req *pluginv2.DataQueryR
return m.client.DataQuery(ctx, req)
}
func (m *coreGRPCClient) Resource(ctx context.Context, req *pluginv2.ResourceRequest) (*pluginv2.ResourceResponse, error) {
return m.client.Resource(ctx, req)
func (m *coreGRPCClient) CallResource(ctx context.Context, req *pluginv2.CallResource_Request) (*pluginv2.CallResource_Response, error) {
return m.client.CallResource(ctx, req)
}
var _ CoreServer = &coreGRPCServer{}
var _ CoreServer = &coreGRPCClient{}
package backend
package plugin
import (
"context"
......@@ -8,16 +8,20 @@ import (
"google.golang.org/grpc"
)
type DiagnosticsServer interface {
pluginv2.DiagnosticsServer
}
// DiagnosticsGRPCPlugin implements the GRPCPlugin interface from github.com/hashicorp/go-plugin.
type DiagnosticsGRPCPlugin struct {
plugin.NetRPCUnsupportedPlugin
plugin.GRPCPlugin
server pluginv2.DiagnosticsServer
DiagnosticsServer DiagnosticsServer
}
func (p *DiagnosticsGRPCPlugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server) error {
pluginv2.RegisterDiagnosticsServer(s, &diagnosticsGRPCServer{
server: p.server,
server: p.DiagnosticsServer,
})
return nil
}
......@@ -27,7 +31,7 @@ func (p *DiagnosticsGRPCPlugin) GRPCClient(ctx context.Context, broker *plugin.G
}
type diagnosticsGRPCServer struct {
server pluginv2.DiagnosticsServer
server DiagnosticsServer
}
func (s *diagnosticsGRPCServer) CollectMetrics(ctx context.Context, req *pluginv2.CollectMetrics_Request) (*pluginv2.CollectMetrics_Response, error) {
......@@ -49,3 +53,6 @@ func (s *diagnosticsGRPCClient) CollectMetrics(ctx context.Context, req *pluginv
func (s *diagnosticsGRPCClient) CheckHealth(ctx context.Context, req *pluginv2.CheckHealth_Request) (*pluginv2.CheckHealth_Response, error) {
return s.client.CheckHealth(ctx, req)
}
var _ DiagnosticsServer = &diagnosticsGRPCServer{}
var _ DiagnosticsServer = &diagnosticsGRPCClient{}
package backend
package plugin
import (
"context"
......@@ -11,17 +11,29 @@ import (
"google.golang.org/grpc/metadata"
)
type TransformServer interface {
TransformData(ctx context.Context, req *pluginv2.DataQueryRequest, callback TransformCallBack) (*pluginv2.DataQueryResponse, error)
}
type transformClient interface {
DataQuery(ctx context.Context, req *pluginv2.DataQueryRequest, callback TransformCallBack) (*pluginv2.DataQueryResponse, error)
}
type TransformCallBack interface {
DataQuery(ctx context.Context, req *pluginv2.DataQueryRequest) (*pluginv2.DataQueryResponse, error)
}
// TransformGRPCPlugin implements the GRPCPlugin interface from github.com/hashicorp/go-plugin.
type TransformGRPCPlugin struct {
plugin.NetRPCUnsupportedPlugin
plugin.GRPCPlugin
adapter *sdkAdapter
TransformServer TransformServer
}
func (p *TransformGRPCPlugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server) error {
pluginv2.RegisterTransformServer(s, &transformGRPCServer{
adapter: p.adapter,
broker: broker,
server: p.TransformServer,
broker: broker,
})
return nil
}
......@@ -31,8 +43,8 @@ func (p *TransformGRPCPlugin) GRPCClient(ctx context.Context, broker *plugin.GRP
}
type transformGRPCServer struct {
broker *plugin.GRPCBroker
adapter *sdkAdapter
broker *plugin.GRPCBroker
server TransformServer
}
func (t *transformGRPCServer) DataQuery(ctx context.Context, req *pluginv2.DataQueryRequest) (*pluginv2.DataQueryResponse, error) {
......@@ -53,8 +65,8 @@ func (t *transformGRPCServer) DataQuery(ctx context.Context, req *pluginv2.DataQ
return nil, err
}
defer conn.Close()
api := &TransformCallBackGrpcClient{pluginv2.NewTransformCallBackClient(conn)}
return t.adapter.TransformData(ctx, req, api)
api := &transformCallBackGrpcClient{pluginv2.NewTransformCallBackClient(conn)}
return t.server.TransformData(ctx, req, api)
}
type transformGRPCClient struct {
......@@ -63,7 +75,7 @@ type transformGRPCClient struct {
}
func (t *transformGRPCClient) DataQuery(ctx context.Context, req *pluginv2.DataQueryRequest, callBack TransformCallBack) (*pluginv2.DataQueryResponse, error) {
callBackServer := &TransformCallBackGrpcServer{Impl: callBack}
callBackServer := &transformCallBackGrpcServer{Impl: callBack}
var s *grpc.Server
serverFunc := func(opts []grpc.ServerOption) *grpc.Server {
......@@ -83,18 +95,23 @@ func (t *transformGRPCClient) DataQuery(ctx context.Context, req *pluginv2.DataQ
// Callback
type TransformCallBackGrpcClient struct {
client pluginv2.TransformCallBackClient
type transformCallBackGrpcServer struct {
Impl TransformCallBack
}
func (t *TransformCallBackGrpcClient) DataQuery(ctx context.Context, req *pluginv2.DataQueryRequest) (*pluginv2.DataQueryResponse, error) {
return t.client.DataQuery(ctx, req)
func (g *transformCallBackGrpcServer) DataQuery(ctx context.Context, req *pluginv2.DataQueryRequest) (*pluginv2.DataQueryResponse, error) {
return g.Impl.DataQuery(ctx, req)
}
type TransformCallBackGrpcServer struct {
Impl TransformCallBack
type transformCallBackGrpcClient struct {
client pluginv2.TransformCallBackClient
}
func (g *TransformCallBackGrpcServer) DataQuery(ctx context.Context, req *pluginv2.DataQueryRequest) (*pluginv2.DataQueryResponse, error) {
return g.Impl.DataQuery(ctx, req)
func (t *transformCallBackGrpcClient) DataQuery(ctx context.Context, req *pluginv2.DataQueryRequest) (*pluginv2.DataQueryResponse, error) {
return t.client.DataQuery(ctx, req)
}
var _ pluginv2.TransformServer = &transformGRPCServer{}
var _ transformClient = &transformGRPCClient{}
var _ pluginv2.TransformServer = &transformCallBackGrpcServer{}
var _ pluginv2.TransformServer = &transformCallBackGrpcClient{}
package plugin
import (
plugin "github.com/hashicorp/go-plugin"
)
const (
// ProtocolVersion is the current (latest) supported protocol version.
ProtocolVersion = 2
// MagicCookieKey is the the magic cookie key that will be used for negotiating
// between plugin host and client.
// Should NEVER be changed.
MagicCookieKey = "grafana_plugin_type"
// MagicCookieValue is the the magic cookie value that will be used for negotiating
// between plugin host and client.
// Should NEVER be changed.
MagicCookieValue = "datasource"
)
// handshake is the HandshakeConfig used to configure clients and servers.
var handshake = plugin.HandshakeConfig{
ProtocolVersion: ProtocolVersion,
MagicCookieKey: MagicCookieKey,
MagicCookieValue: MagicCookieValue,
}
package backend
package plugin
import (
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
plugin "github.com/hashicorp/go-plugin"
"google.golang.org/grpc"
)
//ServeOpts options for serving plugins.
type ServeOpts struct {
CheckHealthHandler CheckHealthHandler
DataQueryHandler DataQueryHandler
ResourceHandler ResourceHandler
TransformDataHandler TransformDataHandler
DiagnosticsServer DiagnosticsServer
CoreServer CoreServer
TransformServer TransformServer
// GRPCServer factory method for creating GRPC server.
// If nil, the default one will be used.
......@@ -22,45 +21,32 @@ func Serve(opts ServeOpts) error {
versionedPlugins := make(map[int]plugin.PluginSet)
pSet := make(plugin.PluginSet)
sdkAdapter := &sdkAdapter{
checkHealthHandler: opts.CheckHealthHandler,
dataQueryHandler: opts.DataQueryHandler,
resourceHandler: opts.ResourceHandler,
transformDataHandler: opts.TransformDataHandler,
}
pSet["diagnostics"] = &DiagnosticsGRPCPlugin{
server: sdkAdapter,
if opts.DiagnosticsServer != nil {
pSet["diagnostics"] = &DiagnosticsGRPCPlugin{
DiagnosticsServer: opts.DiagnosticsServer,
}
}
if opts.DataQueryHandler != nil || opts.ResourceHandler != nil {
if opts.CoreServer != nil {
pSet["backend"] = &CoreGRPCPlugin{
server: sdkAdapter,
CoreServer: opts.CoreServer,
}
}
if opts.TransformDataHandler != nil {
if opts.TransformServer != nil {
pSet["transform"] = &TransformGRPCPlugin{
adapter: sdkAdapter,
TransformServer: opts.TransformServer,
}
}
versionedPlugins[ProtocolVersion] = pSet
if opts.GRPCServer == nil {
// opts.GRPCServer = plugin.DefaultGRPCServer
// hack for now to add grpc prometheuc server interceptor
opts.GRPCServer = func(serverOptions []grpc.ServerOption) *grpc.Server {
mergedOptions := serverOptions
mergedOptions = append(mergedOptions, grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor))
server := grpc.NewServer(mergedOptions...)
grpc_prometheus.Register(server)
return server
}
opts.GRPCServer = plugin.DefaultGRPCServer
}
plugin.Serve(&plugin.ServeConfig{
HandshakeConfig: Handshake,
HandshakeConfig: handshake,
VersionedPlugins: versionedPlugins,
GRPCServer: opts.GRPCServer,
})
......
package backend
import (
"bytes"
"context"
"github.com/grafana/grafana-plugin-sdk-go/dataframe"
"github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/expfmt"
)
// sdkAdapter adapter between protobuf and SDK interfaces.
type sdkAdapter struct {
checkHealthHandler CheckHealthHandler
dataQueryHandler DataQueryHandler
resourceHandler ResourceHandler
transformDataHandler TransformDataHandler
}
func (a *sdkAdapter) CollectMetrics(ctx context.Context, protoReq *pluginv2.CollectMetrics_Request) (*pluginv2.CollectMetrics_Response, error) {
mfs, err := prometheus.DefaultGatherer.Gather()
if err != nil {
return nil, err
}
var buf bytes.Buffer
for _, mf := range mfs {
_, err := expfmt.MetricFamilyToText(&buf, mf)
if err != nil {
return nil, err
}
}
return &pluginv2.CollectMetrics_Response{
Metrics: &pluginv2.CollectMetrics_Payload{
Prometheus: buf.Bytes(),
},
}, nil
}
func (a *sdkAdapter) CheckHealth(ctx context.Context, protoReq *pluginv2.CheckHealth_Request) (*pluginv2.CheckHealth_Response, error) {
if a.checkHealthHandler != nil {
res, err := a.checkHealthHandler.CheckHealth(ctx)
if err != nil {
return nil, err
}
return res.toProtobuf(), nil
}
return &pluginv2.CheckHealth_Response{
Status: pluginv2.CheckHealth_Response_OK,
}, nil
}
func (a *sdkAdapter) DataQuery(ctx context.Context, req *pluginv2.DataQueryRequest) (*pluginv2.DataQueryResponse, error) {
resp, err := a.dataQueryHandler.DataQuery(ctx, dataQueryRequestFromProto(req))
if err != nil {
return nil, err
}
return resp.toProtobuf()
}
func (a *sdkAdapter) Resource(ctx context.Context, req *pluginv2.ResourceRequest) (*pluginv2.ResourceResponse, error) {
res, err := a.resourceHandler.Resource(ctx, resourceRequestFromProtobuf(req))
if err != nil {
return nil, err
}
return res.toProtobuf(), nil
}
func (a *sdkAdapter) TransformData(ctx context.Context, req *pluginv2.DataQueryRequest, callBack TransformCallBack) (*pluginv2.DataQueryResponse, error) {
resp, err := a.transformDataHandler.TransformData(ctx, dataQueryRequestFromProto(req), &transformCallBackWrapper{callBack})
if err != nil {
return nil, err
}
encodedFrames := make([][]byte, len(resp.Frames))
for i, frame := range resp.Frames {
encodedFrames[i], err = dataframe.MarshalArrow(frame)
if err != nil {
return nil, err
}
}
return &pluginv2.DataQueryResponse{
Frames: encodedFrames,
Metadata: resp.Metadata,
}, nil
}
package backend
import (
"context"
"github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2"
)
type TransformHandlers interface {
TransformDataHandler
}
type TransformDataHandler interface {
TransformData(ctx context.Context, req *DataQueryRequest, callBack TransformCallBackHandler) (*DataQueryResponse, error)
}
// Callback
type TransformCallBackHandler interface {
// TODO: Forget if I actually need PluginConfig on the callback or not.
DataQuery(ctx context.Context, req *DataQueryRequest) (*DataQueryResponse, error)
}
type transformCallBackWrapper struct {
callBack TransformCallBack
}
func (tw *transformCallBackWrapper) DataQuery(ctx context.Context, req *DataQueryRequest) (*DataQueryResponse, error) {
protoRes, err := tw.callBack.DataQuery(ctx, req.toProtobuf())
if err != nil {
return nil, err
}
return dataQueryResponseFromProtobuf(protoRes)
}
// TransformPlugin is the Grafana transform plugin interface.
type TransformPlugin interface {
DataQuery(ctx context.Context, req *pluginv2.DataQueryRequest, callback TransformCallBack) (*pluginv2.DataQueryResponse, error)
}
// Callback
type TransformCallBack interface {
DataQuery(ctx context.Context, req *pluginv2.DataQueryRequest) (*pluginv2.DataQueryResponse, error)
}
......@@ -80,10 +80,14 @@ func buildArrowFields(f *Frame) ([]arrow.Field, error) {
return nil, err
}
fieldMeta := map[string]string{
"name": field.Name,
"labels": field.Labels.String(),
fieldMeta := map[string]string{"name": field.Name}
if field.Labels != nil {
if fieldMeta["labels"], err = toJSONString(field.Labels); err != nil {
return nil, err
}
}
if field.Config != nil {
str, err := toJSONString(field.Config)
if err != nil {
......@@ -293,16 +297,12 @@ func initializeFrameFields(schema *arrow.Schema, frame *Frame) ([]bool, error) {
Name: field.Name,
}
if labelsAsString, ok := getMDKey("labels", field.Metadata); ok {
var err error
sdkField.Labels, err = LabelsFromString(labelsAsString)
if err != nil {
if err := json.Unmarshal([]byte(labelsAsString), &sdkField.Labels); err != nil {
return nil, err
}
}
if configAsString, ok := getMDKey("config", field.Metadata); ok {
var err error
sdkField.Config, err = FieldConfigFromJSON(configAsString)
if err != nil {
if err := json.Unmarshal([]byte(configAsString), &sdkField.Config); err != nil {
return nil, err
}
}
......@@ -645,7 +645,8 @@ func UnmarshalArrow(b []byte) (*Frame, error) {
return frame, nil
}
// ToJSONString return the FieldConfig as a json string
// ToJSONString calls json.Marshal on val and returns it as a string. An
// error is returned if json.Marshal errors.
func toJSONString(val interface{}) (string, error) {
b, err := json.Marshal(val)
if err != nil {
......
package dataframe
import (
"encoding/json"
"fmt"
"math"
"strconv"
......@@ -72,16 +71,6 @@ func (sf *ConfFloat64) UnmarshalJSON(data []byte) error {
return nil
}
// FieldConfigFromJSON create a FieldConfig from json string
func FieldConfigFromJSON(jsonStr string) (*FieldConfig, error) {
var cfg FieldConfig
err := json.Unmarshal([]byte(jsonStr), &cfg)
if err != nil {
return nil, err
}
return &cfg, nil
}
// SetDecimals modifies the FieldConfig's Decimals property to
// be set to v and returns the FieldConfig. It is a convenance function
// since the Decimals property is a pointer.
......
// -build !test
package dataframe
//go:generate genny -in=$GOFILE -out=nullable_vector.gen.go gen "gen=uint8,uint16,uint32,uint64,int8,int16,int32,int64,float32,float64,string,bool,time.Time"
......@@ -28,7 +26,5 @@ func (v *nullablegenVector) Len() int {
}
func (v *nullablegenVector) PrimitiveType() VectorPType {
// following generates the right code but makes this invalid
//return VectorPTypeNullablegen
return vectorPType(v)
}
......@@ -2,8 +2,6 @@
// Any changes will be lost if this file is regenerated.
// see https://github.com/cheekybits/genny
// -build !test
package dataframe
import "time"
......@@ -34,13 +32,9 @@ func (v *nullableUint8Vector) Len() int {
}
func (v *nullableUint8Vector) PrimitiveType() VectorPType {
// following uint8erates the right code but makes this invalid
//return VectorPTypeNullableUint8
return vectorPType(v)
}
// -build !test
//go:Uint16erate uint16ny -in=$GOFILE -out=nullable_vector.Uint16.go uint16 "Uint16=uint8,uint16,uint32,uint64,int8,int16,int32,int64,float32,float64,string,bool,time.Time"
type nullableUint16Vector []*uint16
......@@ -67,13 +61,9 @@ func (v *nullableUint16Vector) Len() int {
}
func (v *nullableUint16Vector) PrimitiveType() VectorPType {
// following uint16erates the right code but makes this invalid
//return VectorPTypeNullableUint16
return vectorPType(v)
}
// -build !test
//go:Uint32erate uint32ny -in=$GOFILE -out=nullable_vector.Uint32.go uint32 "Uint32=uint8,uint16,uint32,uint64,int8,int16,int32,int64,float32,float64,string,bool,time.Time"
type nullableUint32Vector []*uint32
......@@ -100,13 +90,9 @@ func (v *nullableUint32Vector) Len() int {
}
func (v *nullableUint32Vector) PrimitiveType() VectorPType {
// following uint32erates the right code but makes this invalid
//return VectorPTypeNullableUint32
return vectorPType(v)
}
// -build !test
//go:Uint64erate uint64ny -in=$GOFILE -out=nullable_vector.Uint64.go uint64 "Uint64=uint8,uint16,uint32,uint64,int8,int16,int32,int64,float32,float64,string,bool,time.Time"
type nullableUint64Vector []*uint64
......@@ -133,13 +119,9 @@ func (v *nullableUint64Vector) Len() int {
}
func (v *nullableUint64Vector) PrimitiveType() VectorPType {
// following uint64erates the right code but makes this invalid
//return VectorPTypeNullableUint64
return vectorPType(v)
}
// -build !test
//go:Int8erate int8ny -in=$GOFILE -out=nullable_vector.Int8.go int8 "Int8=uint8,uint16,uint32,uint64,int8,int16,int32,int64,float32,float64,string,bool,time.Time"
type nullableInt8Vector []*int8
......@@ -166,13 +148,9 @@ func (v *nullableInt8Vector) Len() int {
}
func (v *nullableInt8Vector) PrimitiveType() VectorPType {
// following int8erates the right code but makes this invalid
//return VectorPTypeNullableInt8
return vectorPType(v)
}
// -build !test
//go:Int16erate int16ny -in=$GOFILE -out=nullable_vector.Int16.go int16 "Int16=uint8,uint16,uint32,uint64,int8,int16,int32,int64,float32,float64,string,bool,time.Time"
type nullableInt16Vector []*int16
......@@ -199,13 +177,9 @@ func (v *nullableInt16Vector) Len() int {
}
func (v *nullableInt16Vector) PrimitiveType() VectorPType {
// following int16erates the right code but makes this invalid
//return VectorPTypeNullableInt16
return vectorPType(v)
}
// -build !test
//go:Int32erate int32ny -in=$GOFILE -out=nullable_vector.Int32.go int32 "Int32=uint8,uint16,uint32,uint64,int8,int16,int32,int64,float32,float64,string,bool,time.Time"
type nullableInt32Vector []*int32
......@@ -232,13 +206,9 @@ func (v *nullableInt32Vector) Len() int {
}
func (v *nullableInt32Vector) PrimitiveType() VectorPType {
// following int32erates the right code but makes this invalid
//return VectorPTypeNullableInt32
return vectorPType(v)
}
// -build !test
//go:Int64erate int64ny -in=$GOFILE -out=nullable_vector.Int64.go int64 "Int64=uint8,uint16,uint32,uint64,int8,int16,int32,int64,float32,float64,string,bool,time.Time"
type nullableInt64Vector []*int64
......@@ -265,13 +235,9 @@ func (v *nullableInt64Vector) Len() int {
}
func (v *nullableInt64Vector) PrimitiveType() VectorPType {
// following int64erates the right code but makes this invalid
//return VectorPTypeNullableInt64
return vectorPType(v)
}
// -build !test
//go:Float32erate float32ny -in=$GOFILE -out=nullable_vector.Float32.go float32 "Float32=uint8,uint16,uint32,uint64,int8,int16,int32,int64,float32,float64,string,bool,time.Time"
type nullableFloat32Vector []*float32
......@@ -298,13 +264,9 @@ func (v *nullableFloat32Vector) Len() int {
}
func (v *nullableFloat32Vector) PrimitiveType() VectorPType {
// following float32erates the right code but makes this invalid
//return VectorPTypeNullableFloat32
return vectorPType(v)
}
// -build !test
//go:Float64erate float64ny -in=$GOFILE -out=nullable_vector.Float64.go float64 "Float64=uint8,uint16,uint32,uint64,int8,int16,int32,int64,float32,float64,string,bool,time.Time"
type nullableFloat64Vector []*float64
......@@ -331,13 +293,9 @@ func (v *nullableFloat64Vector) Len() int {
}
func (v *nullableFloat64Vector) PrimitiveType() VectorPType {
// following float64erates the right code but makes this invalid
//return VectorPTypeNullableFloat64
return vectorPType(v)
}
// -build !test
//go:Stringerate stringny -in=$GOFILE -out=nullable_vector.String.go string "String=uint8,uint16,uint32,uint64,int8,int16,int32,int64,float32,float64,string,bool,time.Time"
type nullableStringVector []*string
......@@ -364,13 +322,9 @@ func (v *nullableStringVector) Len() int {
}
func (v *nullableStringVector) PrimitiveType() VectorPType {
// following stringerates the right code but makes this invalid
//return VectorPTypeNullableString
return vectorPType(v)
}
// -build !test
//go:Boolerate boolny -in=$GOFILE -out=nullable_vector.Bool.go bool "Bool=uint8,uint16,uint32,uint64,int8,int16,int32,int64,float32,float64,string,bool,time.Time"
type nullableBoolVector []*bool
......@@ -397,13 +351,9 @@ func (v *nullableBoolVector) Len() int {
}
func (v *nullableBoolVector) PrimitiveType() VectorPType {
// following boolerates the right code but makes this invalid
//return VectorPTypeNullableBool
return vectorPType(v)
}
// -build !test
//go:TimeTimeerate timeTimeny -in=$GOFILE -out=nullable_vector.TimeTime.go time.Time "TimeTime=uint8,uint16,uint32,uint64,int8,int16,int32,int64,float32,float64,string,bool,time.Time"
type nullableTimeTimeVector []*time.Time
......@@ -430,7 +380,5 @@ func (v *nullableTimeTimeVector) Len() int {
}
func (v *nullableTimeTimeVector) PrimitiveType() VectorPType {
// following timeTimeerates the right code but makes this invalid
//return VectorPTypeNullableTimeTime
return vectorPType(v)
}
#vendor
vendor/
# Created by .ignore support plugin (hsz.mobi)
coverage.txt
### Go template
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
*.a
*.so
# Folders
_obj
_test
# Architecture specific extensions/prefixes
*.[568vq]
[568vq].out
*.cgo1.go
*.cgo2.c
_cgo_defun.c
_cgo_gotypes.go
_cgo_export.*
_testmain.go
*.exe
*.test
*.prof
### Windows template
# Windows image file caches
Thumbs.db
ehthumbs.db
# Folder config file
Desktop.ini
# Recycle Bin used on file shares
$RECYCLE.BIN/
# Windows Installer files
*.cab
*.msi
*.msm
*.msp
# Windows shortcuts
*.lnk
### Kate template
# Swap Files #
.*.kate-swp
.swp.*
### SublimeText template
# cache files for sublime text
*.tmlanguage.cache
*.tmPreferences.cache
*.stTheme.cache
# workspace files are user-specific
*.sublime-workspace
# project files should be checked into the repository, unless a significant
# proportion of contributors will probably not be using SublimeText
# *.sublime-project
# sftp configuration file
sftp-config.json
### Linux template
*~
# temporary files which can be created if a process still has a handle open of a deleted file
.fuse_hidden*
# KDE directory preferences
.directory
# Linux trash folder which might appear on any partition or disk
.Trash-*
### JetBrains template
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and Webstorm
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
# User-specific stuff:
.idea
.idea/tasks.xml
.idea/dictionaries
.idea/vcs.xml
.idea/jsLibraryMappings.xml
# Sensitive or high-churn files:
.idea/dataSources.ids
.idea/dataSources.xml
.idea/dataSources.local.xml
.idea/sqlDataSources.xml
.idea/dynamic.xml
.idea/uiDesigner.xml
# Gradle:
.idea/gradle.xml
.idea/libraries
# Mongo Explorer plugin:
.idea/mongoSettings.xml
## File-based project format:
*.iws
## Plugin-specific files:
# IntelliJ
/out/
# mpeltonen/sbt-idea plugin
.idea_modules/
# JIRA plugin
atlassian-ide-plugin.xml
# Crashlytics plugin (for Android Studio and IntelliJ)
com_crashlytics_export_strings.xml
crashlytics.properties
crashlytics-build.properties
fabric.properties
### Xcode template
# Xcode
#
# gitignore contributors: remember to update Global/Xcode.gitignore, Objective-C.gitignore & Swift.gitignore
## Build generated
build/
DerivedData/
## Various settings
*.pbxuser
!default.pbxuser
*.mode1v3
!default.mode1v3
*.mode2v3
!default.mode2v3
*.perspectivev3
!default.perspectivev3
xcuserdata/
## Other
*.moved-aside
*.xccheckout
*.xcscmblueprint
### Eclipse template
.metadata
bin/
tmp/
*.tmp
*.bak
*.swp
*~.nib
local.properties
.settings/
.loadpath
.recommenders
# Eclipse Core
.project
# External tool builders
.externalToolBuilders/
# Locally stored "Eclipse launch configurations"
*.launch
# PyDev specific (Python IDE for Eclipse)
*.pydevproject
# CDT-specific (C/C++ Development Tooling)
.cproject
# JDT-specific (Eclipse Java Development Tools)
.classpath
# Java annotation processor (APT)
.factorypath
# PDT-specific (PHP Development Tools)
.buildpath
# sbteclipse plugin
.target
# Tern plugin
.tern-project
# TeXlipse plugin
.texlipse
# STS (Spring Tool Suite)
.springBeans
# Code Recommenders
.recommenders/
# Changelog
All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).
## [Unreleased]
## [1.2.0](https://github.com/grpc-ecosystem/go-grpc-prometheus/releases/tag/v1.2.0) - 2018-06-04
### Added
* Provide metrics object as `prometheus.Collector`, for conventional metric registration.
* Support non-default/global Prometheus registry.
* Allow configuring counters with `prometheus.CounterOpts`.
### Changed
* Remove usage of deprecated `grpc.Code()`.
* Remove usage of deprecated `grpc.Errorf` and replace with `status.Errorf`.
---
This changelog was started with version `v1.2.0`, for earlier versions refer to the respective [GitHub releases](https://github.com/grpc-ecosystem/go-grpc-prometheus/releases).
// Copyright 2016 Michal Witkowski. All Rights Reserved.
// See LICENSE for licensing terms.
// gRPC Prometheus monitoring interceptors for client-side gRPC.
package grpc_prometheus
import (
prom "github.com/prometheus/client_golang/prometheus"
)
var (
// DefaultClientMetrics is the default instance of ClientMetrics. It is
// intended to be used in conjunction the default Prometheus metrics
// registry.
DefaultClientMetrics = NewClientMetrics()
// UnaryClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Unary RPCs.
UnaryClientInterceptor = DefaultClientMetrics.UnaryClientInterceptor()
// StreamClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Streaming RPCs.
StreamClientInterceptor = DefaultClientMetrics.StreamClientInterceptor()
)
func init() {
prom.MustRegister(DefaultClientMetrics.clientStartedCounter)
prom.MustRegister(DefaultClientMetrics.clientHandledCounter)
prom.MustRegister(DefaultClientMetrics.clientStreamMsgReceived)
prom.MustRegister(DefaultClientMetrics.clientStreamMsgSent)
}
// EnableClientHandlingTimeHistogram turns on recording of handling time of
// RPCs. Histogram metrics can be very expensive for Prometheus to retain and
// query. This function acts on the DefaultClientMetrics variable and the
// default Prometheus metrics registry.
func EnableClientHandlingTimeHistogram(opts ...HistogramOption) {
DefaultClientMetrics.EnableClientHandlingTimeHistogram(opts...)
prom.Register(DefaultClientMetrics.clientHandledHistogram)
}
package grpc_prometheus
import (
"io"
prom "github.com/prometheus/client_golang/prometheus"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// ClientMetrics represents a collection of metrics to be registered on a
// Prometheus metrics registry for a gRPC client.
type ClientMetrics struct {
clientStartedCounter *prom.CounterVec
clientHandledCounter *prom.CounterVec
clientStreamMsgReceived *prom.CounterVec
clientStreamMsgSent *prom.CounterVec
clientHandledHistogramEnabled bool
clientHandledHistogramOpts prom.HistogramOpts
clientHandledHistogram *prom.HistogramVec
}
// NewClientMetrics returns a ClientMetrics object. Use a new instance of
// ClientMetrics when not using the default Prometheus metrics registry, for
// example when wanting to control which metrics are added to a registry as
// opposed to automatically adding metrics via init functions.
func NewClientMetrics(counterOpts ...CounterOption) *ClientMetrics {
opts := counterOptions(counterOpts)
return &ClientMetrics{
clientStartedCounter: prom.NewCounterVec(
opts.apply(prom.CounterOpts{
Name: "grpc_client_started_total",
Help: "Total number of RPCs started on the client.",
}), []string{"grpc_type", "grpc_service", "grpc_method"}),
clientHandledCounter: prom.NewCounterVec(
opts.apply(prom.CounterOpts{
Name: "grpc_client_handled_total",
Help: "Total number of RPCs completed by the client, regardless of success or failure.",
}), []string{"grpc_type", "grpc_service", "grpc_method", "grpc_code"}),
clientStreamMsgReceived: prom.NewCounterVec(
opts.apply(prom.CounterOpts{
Name: "grpc_client_msg_received_total",
Help: "Total number of RPC stream messages received by the client.",
}), []string{"grpc_type", "grpc_service", "grpc_method"}),
clientStreamMsgSent: prom.NewCounterVec(
opts.apply(prom.CounterOpts{
Name: "grpc_client_msg_sent_total",
Help: "Total number of gRPC stream messages sent by the client.",
}), []string{"grpc_type", "grpc_service", "grpc_method"}),
clientHandledHistogramEnabled: false,
clientHandledHistogramOpts: prom.HistogramOpts{
Name: "grpc_client_handling_seconds",
Help: "Histogram of response latency (seconds) of the gRPC until it is finished by the application.",
Buckets: prom.DefBuckets,
},
clientHandledHistogram: nil,
}
}
// Describe sends the super-set of all possible descriptors of metrics
// collected by this Collector to the provided channel and returns once
// the last descriptor has been sent.
func (m *ClientMetrics) Describe(ch chan<- *prom.Desc) {
m.clientStartedCounter.Describe(ch)
m.clientHandledCounter.Describe(ch)
m.clientStreamMsgReceived.Describe(ch)
m.clientStreamMsgSent.Describe(ch)
if m.clientHandledHistogramEnabled {
m.clientHandledHistogram.Describe(ch)
}
}
// Collect is called by the Prometheus registry when collecting
// metrics. The implementation sends each collected metric via the
// provided channel and returns once the last metric has been sent.
func (m *ClientMetrics) Collect(ch chan<- prom.Metric) {
m.clientStartedCounter.Collect(ch)
m.clientHandledCounter.Collect(ch)
m.clientStreamMsgReceived.Collect(ch)
m.clientStreamMsgSent.Collect(ch)
if m.clientHandledHistogramEnabled {
m.clientHandledHistogram.Collect(ch)
}
}
// EnableClientHandlingTimeHistogram turns on recording of handling time of RPCs.
// Histogram metrics can be very expensive for Prometheus to retain and query.
func (m *ClientMetrics) EnableClientHandlingTimeHistogram(opts ...HistogramOption) {
for _, o := range opts {
o(&m.clientHandledHistogramOpts)
}
if !m.clientHandledHistogramEnabled {
m.clientHandledHistogram = prom.NewHistogramVec(
m.clientHandledHistogramOpts,
[]string{"grpc_type", "grpc_service", "grpc_method"},
)
}
m.clientHandledHistogramEnabled = true
}
// UnaryClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Unary RPCs.
func (m *ClientMetrics) UnaryClientInterceptor() func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
monitor := newClientReporter(m, Unary, method)
monitor.SentMessage()
err := invoker(ctx, method, req, reply, cc, opts...)
if err != nil {
monitor.ReceivedMessage()
}
st, _ := status.FromError(err)
monitor.Handled(st.Code())
return err
}
}
// StreamClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Streaming RPCs.
func (m *ClientMetrics) StreamClientInterceptor() func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
monitor := newClientReporter(m, clientStreamType(desc), method)
clientStream, err := streamer(ctx, desc, cc, method, opts...)
if err != nil {
st, _ := status.FromError(err)
monitor.Handled(st.Code())
return nil, err
}
return &monitoredClientStream{clientStream, monitor}, nil
}
}
func clientStreamType(desc *grpc.StreamDesc) grpcType {
if desc.ClientStreams && !desc.ServerStreams {
return ClientStream
} else if !desc.ClientStreams && desc.ServerStreams {
return ServerStream
}
return BidiStream
}
// monitoredClientStream wraps grpc.ClientStream allowing each Sent/Recv of message to increment counters.
type monitoredClientStream struct {
grpc.ClientStream
monitor *clientReporter
}
func (s *monitoredClientStream) SendMsg(m interface{}) error {
err := s.ClientStream.SendMsg(m)
if err == nil {
s.monitor.SentMessage()
}
return err
}
func (s *monitoredClientStream) RecvMsg(m interface{}) error {
err := s.ClientStream.RecvMsg(m)
if err == nil {
s.monitor.ReceivedMessage()
} else if err == io.EOF {
s.monitor.Handled(codes.OK)
} else {
st, _ := status.FromError(err)
s.monitor.Handled(st.Code())
}
return err
}
// Copyright 2016 Michal Witkowski. All Rights Reserved.
// See LICENSE for licensing terms.
package grpc_prometheus
import (
"time"
"google.golang.org/grpc/codes"
)
type clientReporter struct {
metrics *ClientMetrics
rpcType grpcType
serviceName string
methodName string
startTime time.Time
}
func newClientReporter(m *ClientMetrics, rpcType grpcType, fullMethod string) *clientReporter {
r := &clientReporter{
metrics: m,
rpcType: rpcType,
}
if r.metrics.clientHandledHistogramEnabled {
r.startTime = time.Now()
}
r.serviceName, r.methodName = splitMethodName(fullMethod)
r.metrics.clientStartedCounter.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
return r
}
func (r *clientReporter) ReceivedMessage() {
r.metrics.clientStreamMsgReceived.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
}
func (r *clientReporter) SentMessage() {
r.metrics.clientStreamMsgSent.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
}
func (r *clientReporter) Handled(code codes.Code) {
r.metrics.clientHandledCounter.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName, code.String()).Inc()
if r.metrics.clientHandledHistogramEnabled {
r.metrics.clientHandledHistogram.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Observe(time.Since(r.startTime).Seconds())
}
}
SHELL="/bin/bash"
GOFILES_NOVENDOR = $(shell go list ./... | grep -v /vendor/)
all: vet fmt test
fmt:
go fmt $(GOFILES_NOVENDOR)
vet:
go vet $(GOFILES_NOVENDOR)
test: vet
./scripts/test_all.sh
.PHONY: all vet test
package grpc_prometheus
import (
prom "github.com/prometheus/client_golang/prometheus"
)
// A CounterOption lets you add options to Counter metrics using With* funcs.
type CounterOption func(*prom.CounterOpts)
type counterOptions []CounterOption
func (co counterOptions) apply(o prom.CounterOpts) prom.CounterOpts {
for _, f := range co {
f(&o)
}
return o
}
// WithConstLabels allows you to add ConstLabels to Counter metrics.
func WithConstLabels(labels prom.Labels) CounterOption {
return func(o *prom.CounterOpts) {
o.ConstLabels = labels
}
}
// A HistogramOption lets you add options to Histogram metrics using With*
// funcs.
type HistogramOption func(*prom.HistogramOpts)
// WithHistogramBuckets allows you to specify custom bucket ranges for histograms if EnableHandlingTimeHistogram is on.
func WithHistogramBuckets(buckets []float64) HistogramOption {
return func(o *prom.HistogramOpts) { o.Buckets = buckets }
}
// WithHistogramConstLabels allows you to add custom ConstLabels to
// histograms metrics.
func WithHistogramConstLabels(labels prom.Labels) HistogramOption {
return func(o *prom.HistogramOpts) {
o.ConstLabels = labels
}
}
// Copyright 2016 Michal Witkowski. All Rights Reserved.
// See LICENSE for licensing terms.
// gRPC Prometheus monitoring interceptors for server-side gRPC.
package grpc_prometheus
import (
prom "github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
)
var (
// DefaultServerMetrics is the default instance of ServerMetrics. It is
// intended to be used in conjunction the default Prometheus metrics
// registry.
DefaultServerMetrics = NewServerMetrics()
// UnaryServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Unary RPCs.
UnaryServerInterceptor = DefaultServerMetrics.UnaryServerInterceptor()
// StreamServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Streaming RPCs.
StreamServerInterceptor = DefaultServerMetrics.StreamServerInterceptor()
)
func init() {
prom.MustRegister(DefaultServerMetrics.serverStartedCounter)
prom.MustRegister(DefaultServerMetrics.serverHandledCounter)
prom.MustRegister(DefaultServerMetrics.serverStreamMsgReceived)
prom.MustRegister(DefaultServerMetrics.serverStreamMsgSent)
}
// Register takes a gRPC server and pre-initializes all counters to 0. This
// allows for easier monitoring in Prometheus (no missing metrics), and should
// be called *after* all services have been registered with the server. This
// function acts on the DefaultServerMetrics variable.
func Register(server *grpc.Server) {
DefaultServerMetrics.InitializeMetrics(server)
}
// EnableHandlingTimeHistogram turns on recording of handling time
// of RPCs. Histogram metrics can be very expensive for Prometheus
// to retain and query. This function acts on the DefaultServerMetrics
// variable and the default Prometheus metrics registry.
func EnableHandlingTimeHistogram(opts ...HistogramOption) {
DefaultServerMetrics.EnableHandlingTimeHistogram(opts...)
prom.Register(DefaultServerMetrics.serverHandledHistogram)
}
package grpc_prometheus
import (
prom "github.com/prometheus/client_golang/prometheus"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/status"
)
// ServerMetrics represents a collection of metrics to be registered on a
// Prometheus metrics registry for a gRPC server.
type ServerMetrics struct {
serverStartedCounter *prom.CounterVec
serverHandledCounter *prom.CounterVec
serverStreamMsgReceived *prom.CounterVec
serverStreamMsgSent *prom.CounterVec
serverHandledHistogramEnabled bool
serverHandledHistogramOpts prom.HistogramOpts
serverHandledHistogram *prom.HistogramVec
}
// NewServerMetrics returns a ServerMetrics object. Use a new instance of
// ServerMetrics when not using the default Prometheus metrics registry, for
// example when wanting to control which metrics are added to a registry as
// opposed to automatically adding metrics via init functions.
func NewServerMetrics(counterOpts ...CounterOption) *ServerMetrics {
opts := counterOptions(counterOpts)
return &ServerMetrics{
serverStartedCounter: prom.NewCounterVec(
opts.apply(prom.CounterOpts{
Name: "grpc_server_started_total",
Help: "Total number of RPCs started on the server.",
}), []string{"grpc_type", "grpc_service", "grpc_method"}),
serverHandledCounter: prom.NewCounterVec(
opts.apply(prom.CounterOpts{
Name: "grpc_server_handled_total",
Help: "Total number of RPCs completed on the server, regardless of success or failure.",
}), []string{"grpc_type", "grpc_service", "grpc_method", "grpc_code"}),
serverStreamMsgReceived: prom.NewCounterVec(
opts.apply(prom.CounterOpts{
Name: "grpc_server_msg_received_total",
Help: "Total number of RPC stream messages received on the server.",
}), []string{"grpc_type", "grpc_service", "grpc_method"}),
serverStreamMsgSent: prom.NewCounterVec(
opts.apply(prom.CounterOpts{
Name: "grpc_server_msg_sent_total",
Help: "Total number of gRPC stream messages sent by the server.",
}), []string{"grpc_type", "grpc_service", "grpc_method"}),
serverHandledHistogramEnabled: false,
serverHandledHistogramOpts: prom.HistogramOpts{
Name: "grpc_server_handling_seconds",
Help: "Histogram of response latency (seconds) of gRPC that had been application-level handled by the server.",
Buckets: prom.DefBuckets,
},
serverHandledHistogram: nil,
}
}
// EnableHandlingTimeHistogram enables histograms being registered when
// registering the ServerMetrics on a Prometheus registry. Histograms can be
// expensive on Prometheus servers. It takes options to configure histogram
// options such as the defined buckets.
func (m *ServerMetrics) EnableHandlingTimeHistogram(opts ...HistogramOption) {
for _, o := range opts {
o(&m.serverHandledHistogramOpts)
}
if !m.serverHandledHistogramEnabled {
m.serverHandledHistogram = prom.NewHistogramVec(
m.serverHandledHistogramOpts,
[]string{"grpc_type", "grpc_service", "grpc_method"},
)
}
m.serverHandledHistogramEnabled = true
}
// Describe sends the super-set of all possible descriptors of metrics
// collected by this Collector to the provided channel and returns once
// the last descriptor has been sent.
func (m *ServerMetrics) Describe(ch chan<- *prom.Desc) {
m.serverStartedCounter.Describe(ch)
m.serverHandledCounter.Describe(ch)
m.serverStreamMsgReceived.Describe(ch)
m.serverStreamMsgSent.Describe(ch)
if m.serverHandledHistogramEnabled {
m.serverHandledHistogram.Describe(ch)
}
}
// Collect is called by the Prometheus registry when collecting
// metrics. The implementation sends each collected metric via the
// provided channel and returns once the last metric has been sent.
func (m *ServerMetrics) Collect(ch chan<- prom.Metric) {
m.serverStartedCounter.Collect(ch)
m.serverHandledCounter.Collect(ch)
m.serverStreamMsgReceived.Collect(ch)
m.serverStreamMsgSent.Collect(ch)
if m.serverHandledHistogramEnabled {
m.serverHandledHistogram.Collect(ch)
}
}
// UnaryServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Unary RPCs.
func (m *ServerMetrics) UnaryServerInterceptor() func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
monitor := newServerReporter(m, Unary, info.FullMethod)
monitor.ReceivedMessage()
resp, err := handler(ctx, req)
st, _ := status.FromError(err)
monitor.Handled(st.Code())
if err == nil {
monitor.SentMessage()
}
return resp, err
}
}
// StreamServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Streaming RPCs.
func (m *ServerMetrics) StreamServerInterceptor() func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
monitor := newServerReporter(m, streamRPCType(info), info.FullMethod)
err := handler(srv, &monitoredServerStream{ss, monitor})
st, _ := status.FromError(err)
monitor.Handled(st.Code())
return err
}
}
// InitializeMetrics initializes all metrics, with their appropriate null
// value, for all gRPC methods registered on a gRPC server. This is useful, to
// ensure that all metrics exist when collecting and querying.
func (m *ServerMetrics) InitializeMetrics(server *grpc.Server) {
serviceInfo := server.GetServiceInfo()
for serviceName, info := range serviceInfo {
for _, mInfo := range info.Methods {
preRegisterMethod(m, serviceName, &mInfo)
}
}
}
func streamRPCType(info *grpc.StreamServerInfo) grpcType {
if info.IsClientStream && !info.IsServerStream {
return ClientStream
} else if !info.IsClientStream && info.IsServerStream {
return ServerStream
}
return BidiStream
}
// monitoredStream wraps grpc.ServerStream allowing each Sent/Recv of message to increment counters.
type monitoredServerStream struct {
grpc.ServerStream
monitor *serverReporter
}
func (s *monitoredServerStream) SendMsg(m interface{}) error {
err := s.ServerStream.SendMsg(m)
if err == nil {
s.monitor.SentMessage()
}
return err
}
func (s *monitoredServerStream) RecvMsg(m interface{}) error {
err := s.ServerStream.RecvMsg(m)
if err == nil {
s.monitor.ReceivedMessage()
}
return err
}
// preRegisterMethod is invoked on Register of a Server, allowing all gRPC services labels to be pre-populated.
func preRegisterMethod(metrics *ServerMetrics, serviceName string, mInfo *grpc.MethodInfo) {
methodName := mInfo.Name
methodType := string(typeFromMethodInfo(mInfo))
// These are just references (no increments), as just referencing will create the labels but not set values.
metrics.serverStartedCounter.GetMetricWithLabelValues(methodType, serviceName, methodName)
metrics.serverStreamMsgReceived.GetMetricWithLabelValues(methodType, serviceName, methodName)
metrics.serverStreamMsgSent.GetMetricWithLabelValues(methodType, serviceName, methodName)
if metrics.serverHandledHistogramEnabled {
metrics.serverHandledHistogram.GetMetricWithLabelValues(methodType, serviceName, methodName)
}
for _, code := range allCodes {
metrics.serverHandledCounter.GetMetricWithLabelValues(methodType, serviceName, methodName, code.String())
}
}
// Copyright 2016 Michal Witkowski. All Rights Reserved.
// See LICENSE for licensing terms.
package grpc_prometheus
import (
"time"
"google.golang.org/grpc/codes"
)
type serverReporter struct {
metrics *ServerMetrics
rpcType grpcType
serviceName string
methodName string
startTime time.Time
}
func newServerReporter(m *ServerMetrics, rpcType grpcType, fullMethod string) *serverReporter {
r := &serverReporter{
metrics: m,
rpcType: rpcType,
}
if r.metrics.serverHandledHistogramEnabled {
r.startTime = time.Now()
}
r.serviceName, r.methodName = splitMethodName(fullMethod)
r.metrics.serverStartedCounter.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
return r
}
func (r *serverReporter) ReceivedMessage() {
r.metrics.serverStreamMsgReceived.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
}
func (r *serverReporter) SentMessage() {
r.metrics.serverStreamMsgSent.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
}
func (r *serverReporter) Handled(code codes.Code) {
r.metrics.serverHandledCounter.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName, code.String()).Inc()
if r.metrics.serverHandledHistogramEnabled {
r.metrics.serverHandledHistogram.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Observe(time.Since(r.startTime).Seconds())
}
}
// Copyright 2016 Michal Witkowski. All Rights Reserved.
// See LICENSE for licensing terms.
package grpc_prometheus
import (
"strings"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)
type grpcType string
const (
Unary grpcType = "unary"
ClientStream grpcType = "client_stream"
ServerStream grpcType = "server_stream"
BidiStream grpcType = "bidi_stream"
)
var (
allCodes = []codes.Code{
codes.OK, codes.Canceled, codes.Unknown, codes.InvalidArgument, codes.DeadlineExceeded, codes.NotFound,
codes.AlreadyExists, codes.PermissionDenied, codes.Unauthenticated, codes.ResourceExhausted,
codes.FailedPrecondition, codes.Aborted, codes.OutOfRange, codes.Unimplemented, codes.Internal,
codes.Unavailable, codes.DataLoss,
}
)
func splitMethodName(fullMethodName string) (string, string) {
fullMethodName = strings.TrimPrefix(fullMethodName, "/") // remove leading slash
if i := strings.Index(fullMethodName, "/"); i >= 0 {
return fullMethodName[:i], fullMethodName[i+1:]
}
return "unknown", "unknown"
}
func typeFromMethodInfo(mInfo *grpc.MethodInfo) grpcType {
if !mInfo.IsClientStream && !mInfo.IsServerStream {
return Unary
}
if mInfo.IsClientStream && !mInfo.IsServerStream {
return ClientStream
}
if !mInfo.IsClientStream && mInfo.IsServerStream {
return ServerStream
}
return BidiStream
}
......@@ -148,12 +148,10 @@ github.com/gosimple/slug
# github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4
github.com/grafana/grafana-plugin-model/go/datasource
github.com/grafana/grafana-plugin-model/go/renderer
# github.com/grafana/grafana-plugin-sdk-go v0.11.0
github.com/grafana/grafana-plugin-sdk-go/backend
# github.com/grafana/grafana-plugin-sdk-go v0.14.0
github.com/grafana/grafana-plugin-sdk-go/backend/plugin
github.com/grafana/grafana-plugin-sdk-go/dataframe
github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2
# github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/grpc-ecosystem/go-grpc-prometheus
# github.com/hashicorp/go-hclog v0.8.0
github.com/hashicorp/go-hclog
# github.com/hashicorp/go-plugin v1.0.1
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment