Commit 4ff613a4 by Marcus Efraimsson Committed by GitHub

Backend Plugins: Support handling of streaming resource response (#22580)

Use v0.19.0 of SDK.
Support handling of streaming resource response.
Disable gzip/compression middleware for resources 
to allow chunked/streaming response to clients the gzip
middleware had to be disabled since it buffers the full
response before sending it to the client.

Closes #22569

Co-Authored-By: Arve Knudsen <arve.knudsen@gmail.com>
parent f95c8b78
......@@ -32,7 +32,7 @@ require (
github.com/gorilla/websocket v1.4.1
github.com/gosimple/slug v1.4.2
github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4
github.com/grafana/grafana-plugin-sdk-go v0.16.0
github.com/grafana/grafana-plugin-sdk-go v0.19.0
github.com/hashicorp/go-hclog v0.8.0
github.com/hashicorp/go-plugin v1.0.1
github.com/hashicorp/go-version v1.1.0
......
......@@ -133,10 +133,8 @@ github.com/gosimple/slug v1.4.2 h1:jDmprx3q/9Lfk4FkGZtvzDQ9Cj9eAmsjzeQGp24PeiQ=
github.com/gosimple/slug v1.4.2/go.mod h1:ER78kgg1Mv0NQGlXiDe57DpCyfbNywXXZ9mIorhxAf0=
github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4 h1:SPdxCL9BChFTlyi0Khv64vdCW4TMna8+sxL7+Chx+Ag=
github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4/go.mod h1:nc0XxBzjeGcrMltCDw269LoWF9S8ibhgxolCdA1R8To=
github.com/grafana/grafana-plugin-sdk-go v0.16.0 h1:fuoLzsQLs0RKcvXDP/cAQxaZGP1rbnoBwUaY/1yvh6k=
github.com/grafana/grafana-plugin-sdk-go v0.16.0/go.mod h1:D1MkO+4EPCWc1Wrr260hq7wbo7Ox0grnNWBygulq7aM=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/grafana/grafana-plugin-sdk-go v0.19.0 h1:qLq8tOSxZ9O7+AdduXJVU6jEOlg/2eP8UXdhAzQ81g0=
github.com/grafana/grafana-plugin-sdk-go v0.19.0/go.mod h1:G6Ov9M+FDOZXNw8eKXINO6XzqdUvTs7huwyQp5jLTBQ=
github.com/hashicorp/go-hclog v0.0.0-20180709165350-ff2cf002a8dd/go.mod h1:9bjs9uLqI8l75knNv3lV1kA55veR+WUPSiKIWcQHudI=
github.com/hashicorp/go-hclog v0.8.0 h1:z3ollgGRg8RjfJH6UVBaG54R70GFd++QOkvnJH3VSBY=
github.com/hashicorp/go-hclog v0.8.0/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ=
......
......@@ -226,8 +226,8 @@ func (hs *HTTPServer) CheckHealth(c *models.ReqContext) Response {
}
payload := map[string]interface{}{
"status": resp.Status.String(),
"info": resp.Info,
"status": resp.Status.String(),
"message": resp.Message,
}
if resp.Status != backendplugin.HealthStatusOk {
......
......@@ -8,6 +8,8 @@ import (
"gopkg.in/macaron.v1"
)
const resourcesPath = "/resources"
func Gziper() macaron.Handler {
gziperLogger := log.New("gziper")
gziper := gzip.Gziper()
......@@ -27,6 +29,11 @@ func Gziper() macaron.Handler {
return
}
// ignore resources
if (strings.HasPrefix(requestPath, "/api/datasources/") || strings.HasPrefix(requestPath, "/api/plugins/")) && strings.Contains(requestPath, resourcesPath) {
return
}
if _, err := ctx.Invoke(gziper); err != nil {
gziperLogger.Error("Invoking gzip handler failed", "err", err)
}
......
......@@ -72,7 +72,7 @@ func (p *BackendPlugin) start(ctx context.Context) error {
if rawBackend != nil {
if plugin, ok := rawBackend.(CorePlugin); ok {
p.core = plugin
client.DatasourcePlugin = plugin
client.CorePlugin = plugin
}
}
......@@ -186,8 +186,8 @@ func (p *BackendPlugin) checkHealth(ctx context.Context) (*pluginv2.CheckHealth_
if st, ok := status.FromError(err); ok {
if st.Code() == codes.Unimplemented {
return &pluginv2.CheckHealth_Response{
Status: pluginv2.CheckHealth_Response_UNKNOWN,
Info: "Health check not implemented",
Status: pluginv2.CheckHealth_Response_UNKNOWN,
Message: "Health check not implemented",
}, nil
}
}
......@@ -197,9 +197,13 @@ func (p *BackendPlugin) checkHealth(ctx context.Context) (*pluginv2.CheckHealth_
return res, nil
}
func (p *BackendPlugin) callResource(ctx context.Context, req CallResourceRequest) (*CallResourceResult, error) {
func (p *BackendPlugin) callResource(ctx context.Context, req CallResourceRequest) (callResourceResultStream, error) {
p.logger.Debug("Calling resource", "path", req.Path, "method", req.Method)
if p.core == nil || p.client == nil || p.client.Exited() {
return nil, errors.New("plugin not running, cannot call resource")
}
reqHeaders := map[string]*pluginv2.CallResource_StringList{}
for k, v := range req.Headers {
reqHeaders[k] = &pluginv2.CallResource_StringList{Values: v}
......@@ -238,12 +242,14 @@ func (p *BackendPlugin) callResource(ctx context.Context, req CallResourceReques
}
}
protoResp, err := p.core.CallResource(ctx, protoReq)
protoStream, err := p.core.CallResource(ctx, protoReq)
if err != nil {
if st, ok := status.FromError(err); ok {
if st.Code() == codes.Unimplemented {
return &CallResourceResult{
Status: http.StatusNotImplemented,
return &singleCallResourceResult{
result: &CallResourceResult{
Status: http.StatusNotImplemented,
},
}, nil
}
}
......@@ -251,15 +257,8 @@ func (p *BackendPlugin) callResource(ctx context.Context, req CallResourceReques
return nil, errutil.Wrap("Failed to call resource", err)
}
respHeaders := map[string][]string{}
for key, values := range protoResp.Headers {
respHeaders[key] = values.Values
}
return &CallResourceResult{
Headers: respHeaders,
Body: protoResp.Body,
Status: int(protoResp.Code),
return &callResourceResultStreamImpl{
stream: protoStream,
}, nil
}
......
......@@ -102,17 +102,11 @@ func NewRendererPluginDescriptor(pluginID, executablePath string, startFns Plugi
}
type DiagnosticsPlugin interface {
CollectMetrics(ctx context.Context, req *pluginv2.CollectMetrics_Request) (*pluginv2.CollectMetrics_Response, error)
CheckHealth(ctx context.Context, req *pluginv2.CheckHealth_Request) (*pluginv2.CheckHealth_Response, error)
}
type DatasourcePlugin interface {
DataQuery(ctx context.Context, req *pluginv2.DataQueryRequest) (*pluginv2.DataQueryResponse, error)
plugin.DiagnosticsServer
}
type CorePlugin interface {
CallResource(ctx context.Context, req *pluginv2.CallResource_Request) (*pluginv2.CallResource_Response, error)
DatasourcePlugin
plugin.CoreClient
}
type TransformPlugin interface {
......@@ -127,6 +121,6 @@ type LegacyClient struct {
// Client client for communicating with a plugin using the current plugin protocol.
type Client struct {
DatasourcePlugin DatasourcePlugin
TransformPlugin TransformPlugin
CorePlugin CorePlugin
TransformPlugin TransformPlugin
}
......@@ -37,8 +37,8 @@ func (hs HealthStatus) String() string {
// CheckHealthResult check health result.
type CheckHealthResult struct {
Status HealthStatus
Info string
Status HealthStatus
Message string
}
func checkHealthResultFromProto(protoResp *pluginv2.CheckHealth_Response) *CheckHealthResult {
......@@ -51,8 +51,8 @@ func checkHealthResultFromProto(protoResp *pluginv2.CheckHealth_Response) *Check
}
return &CheckHealthResult{
Status: status,
Info: protoResp.Info,
Status: status,
Message: protoResp.Message,
}
}
......@@ -91,3 +91,46 @@ type CallResourceResult struct {
Headers map[string][]string
Body []byte
}
type callResourceResultStream interface {
Recv() (*CallResourceResult, error)
Close() error
}
type callResourceResultStreamImpl struct {
stream pluginv2.Core_CallResourceClient
}
func (s *callResourceResultStreamImpl) Recv() (*CallResourceResult, error) {
protoResp, err := s.stream.Recv()
if err != nil {
return nil, err
}
respHeaders := map[string][]string{}
for key, values := range protoResp.Headers {
respHeaders[key] = values.Values
}
return &CallResourceResult{
Headers: respHeaders,
Body: protoResp.Body,
Status: int(protoResp.Code),
}, nil
}
func (s *callResourceResultStreamImpl) Close() error {
return s.stream.CloseSend()
}
type singleCallResourceResult struct {
result *CallResourceResult
}
func (s *singleCallResourceResult) Recv() (*CallResourceResult, error) {
return s.result, nil
}
func (s *singleCallResourceResult) Close() error {
return nil
}
......@@ -3,6 +3,7 @@ package backendplugin
import (
"context"
"errors"
"io"
"sync"
"time"
......@@ -209,30 +210,59 @@ func (m *manager) CallResource(config PluginConfig, c *models.ReqContext, path s
Body: body,
}
res, err := p.callResource(clonedReq.Context(), req)
stream, err := p.callResource(clonedReq.Context(), req)
if err != nil {
c.JsonApiErr(500, "Failed to call resource", err)
return
}
// Make sure a content type always is returned in response
if _, exists := res.Headers["Content-Type"]; !exists {
res.Headers["Content-Type"] = []string{"application/json"}
}
processedStreams := 0
for k, values := range res.Headers {
if k == "Set-Cookie" {
continue
for {
resp, err := stream.Recv()
if err == io.EOF {
if processedStreams == 0 {
c.JsonApiErr(500, "Received empty resource response ", nil)
}
return
}
if err != nil {
if processedStreams == 0 {
c.JsonApiErr(500, "Failed to receive response from resource call", err)
} else {
p.logger.Error("Failed to receive response from resource call", "error", err)
}
return
}
// Expected that headers and status are only part of first stream
if processedStreams == 0 {
// Make sure a content type always is returned in response
if _, exists := resp.Headers["Content-Type"]; !exists {
resp.Headers["Content-Type"] = []string{"application/json"}
}
for k, values := range resp.Headers {
// Due to security reasons we don't want to forward
// cookies from a backend plugin to clients/browsers.
if k == "Set-Cookie" {
continue
}
for _, v := range values {
c.Resp.Header().Add(k, v)
}
}
c.WriteHeader(resp.Status)
}
for _, v := range values {
c.Resp.Header().Add(k, v)
if _, err := c.Write(resp.Body); err != nil {
p.logger.Error("Failed to write resource response", "error", err)
}
}
c.WriteHeader(res.Status)
if _, err := c.Write(res.Body); err != nil {
p.logger.Error("Failed to write resource response", "error", err)
c.Resp.Flush()
processedStreams++
}
}
......
......@@ -13,12 +13,12 @@ import (
"github.com/grafana/grafana/pkg/tsdb"
)
func NewDatasourcePluginWrapperV2(log log.Logger, pluginId, pluginType string, plugin backendplugin.DatasourcePlugin) *DatasourcePluginWrapperV2 {
return &DatasourcePluginWrapperV2{DatasourcePlugin: plugin, logger: log, pluginId: pluginId, pluginType: pluginType}
func NewDatasourcePluginWrapperV2(log log.Logger, pluginId, pluginType string, plugin backendplugin.CorePlugin) *DatasourcePluginWrapperV2 {
return &DatasourcePluginWrapperV2{CorePlugin: plugin, logger: log, pluginId: pluginId, pluginType: pluginType}
}
type DatasourcePluginWrapperV2 struct {
backendplugin.DatasourcePlugin
backendplugin.CorePlugin
logger log.Logger
pluginId string
pluginType string
......@@ -68,7 +68,7 @@ func (tw *DatasourcePluginWrapperV2) Query(ctx context.Context, ds *models.DataS
})
}
pbRes, err := tw.DatasourcePlugin.DataQuery(ctx, pbQuery)
pbRes, err := tw.CorePlugin.DataQuery(ctx, pbQuery)
if err != nil {
return nil, err
}
......
......@@ -68,9 +68,9 @@ func (p *DataSourcePlugin) onLegacyPluginStart(pluginID string, client *backendp
}
func (p *DataSourcePlugin) onPluginStart(pluginID string, client *backendplugin.Client, logger log.Logger) error {
if client.DatasourcePlugin != nil {
if client.CorePlugin != nil {
tsdb.RegisterTsdbQueryEndpoint(pluginID, func(dsInfo *models.DataSource) (tsdb.TsdbQueryEndpoint, error) {
return wrapper.NewDatasourcePluginWrapperV2(logger, p.Id, p.Type, client.DatasourcePlugin), nil
return wrapper.NewDatasourcePluginWrapperV2(logger, p.Id, p.Type, client.CorePlugin), nil
})
}
......
......@@ -53,9 +53,9 @@ func (p *TransformPlugin) Load(decoder *json.Decoder, pluginDir string, backendP
func (p *TransformPlugin) onPluginStart(pluginID string, client *backendplugin.Client, logger log.Logger) error {
p.TransformWrapper = NewTransformWrapper(logger, client.TransformPlugin)
if client.DatasourcePlugin != nil {
if client.CorePlugin != nil {
tsdb.RegisterTsdbQueryEndpoint(pluginID, func(dsInfo *models.DataSource) (tsdb.TsdbQueryEndpoint, error) {
return wrapper.NewDatasourcePluginWrapperV2(logger, p.Id, p.Type, client.DatasourcePlugin), nil
return wrapper.NewDatasourcePluginWrapperV2(logger, p.Id, p.Type, client.CorePlugin), nil
})
}
......
......@@ -12,6 +12,10 @@ type CoreServer interface {
pluginv2.CoreServer
}
type CoreClient interface {
pluginv2.CoreClient
}
// CoreGRPCPlugin implements the GRPCPlugin interface from github.com/hashicorp/go-plugin.
type CoreGRPCPlugin struct {
plugin.NetRPCUnsupportedPlugin
......@@ -38,21 +42,21 @@ func (s *coreGRPCServer) DataQuery(ctx context.Context, req *pluginv2.DataQueryR
return s.server.DataQuery(ctx, req)
}
func (s *coreGRPCServer) CallResource(ctx context.Context, req *pluginv2.CallResource_Request) (*pluginv2.CallResource_Response, error) {
return s.server.CallResource(ctx, req)
func (s *coreGRPCServer) CallResource(req *pluginv2.CallResource_Request, srv pluginv2.Core_CallResourceServer) error {
return s.server.CallResource(req, srv)
}
type coreGRPCClient struct {
client pluginv2.CoreClient
}
func (m *coreGRPCClient) DataQuery(ctx context.Context, req *pluginv2.DataQueryRequest) (*pluginv2.DataQueryResponse, error) {
return m.client.DataQuery(ctx, req)
func (m *coreGRPCClient) DataQuery(ctx context.Context, req *pluginv2.DataQueryRequest, opts ...grpc.CallOption) (*pluginv2.DataQueryResponse, error) {
return m.client.DataQuery(ctx, req, opts...)
}
func (m *coreGRPCClient) CallResource(ctx context.Context, req *pluginv2.CallResource_Request) (*pluginv2.CallResource_Response, error) {
return m.client.CallResource(ctx, req)
func (m *coreGRPCClient) CallResource(ctx context.Context, req *pluginv2.CallResource_Request, opts ...grpc.CallOption) (pluginv2.Core_CallResourceClient, error) {
return m.client.CallResource(ctx, req, opts...)
}
var _ CoreServer = &coreGRPCServer{}
var _ CoreServer = &coreGRPCClient{}
var _ CoreClient = &coreGRPCClient{}
......@@ -200,6 +200,13 @@ func buildArrowSchema(f *Frame, fs []arrow.Field) (*arrow.Schema, error) {
}
tableMetaMap["meta"] = str
}
if len(f.Warnings) > 0 {
str, err := toJSONString(f.Warnings)
if err != nil {
return nil, err
}
tableMetaMap["warnings"] = str
}
tableMeta := arrow.MetadataFrom(tableMetaMap)
return arrow.NewSchema(fs, &tableMeta), nil
......@@ -633,6 +640,14 @@ func UnmarshalArrow(b []byte) (*Frame, error) {
}
}
if warningsAsString, ok := getMDKey("warnings", metaData); ok {
var err error
frame.Warnings, err = WarningsFromJSON(warningsAsString)
if err != nil {
return nil, err
}
}
nullable, err := initializeFrameFields(schema, frame)
if err != nil {
return nil, err
......
......@@ -12,8 +12,9 @@ type Frame struct {
Name string
Fields []*Field
RefID string
Meta *QueryResultMeta
RefID string
Meta *QueryResultMeta
Warnings []Warning
}
// Field represents a column of data with a specific type.
......@@ -38,6 +39,11 @@ func (f *Frame) AppendRow(vals ...interface{}) {
}
}
// AppendWarning adds warnings to the data frame.
func (f *Frame) AppendWarning(message string, details string) {
f.Warnings = append(f.Warnings, Warning{Message: message, Details: details})
}
// AppendRowSafe adds a new row to the Frame by appending to each each element of vals to
// the corresponding Field in the dataframe. It has the some constraints as AppendRow but will
// return an error under those conditions instead of panicing.
......
package dataframe
import "encoding/json"
// Warning contains information about problems in a dataframe.
type Warning struct {
// Short message (typically shown in the header)
Message string `json:"message,omitempty"`
// longer error message, shown in the body
Details string `json:"details,omitempty"`
}
// WarningsFromJSON creates a *Warning from a json string.
func WarningsFromJSON(jsonStr string) ([]Warning, error) {
var m []Warning
err := json.Unmarshal([]byte(jsonStr), &m)
if err != nil {
return nil, err
}
return m, nil
}
......@@ -148,7 +148,7 @@ github.com/gosimple/slug
# github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4
github.com/grafana/grafana-plugin-model/go/datasource
github.com/grafana/grafana-plugin-model/go/renderer
# github.com/grafana/grafana-plugin-sdk-go v0.16.0
# github.com/grafana/grafana-plugin-sdk-go v0.19.0
github.com/grafana/grafana-plugin-sdk-go/backend/plugin
github.com/grafana/grafana-plugin-sdk-go/dataframe
github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment