Commit 1a810ebf by Marcus Efraimsson Committed by GitHub

Plugins: Move backend plugin manager to service (#21474)

Moves backend plugin manager to service instead of
global functions in backendplugin package.

Closes #20053
parent bb849d53
......@@ -6,6 +6,7 @@ import (
"github.com/gosimple/slug"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/plugins/backendplugin"
"github.com/grafana/grafana/pkg/setting"
)
......@@ -45,7 +46,7 @@ type JwtTokenAuth struct {
Params map[string]string `json:"params"`
}
func (app *AppPlugin) Load(decoder *json.Decoder, pluginDir string) error {
func (app *AppPlugin) Load(decoder *json.Decoder, pluginDir string, backendPluginManager backendplugin.Manager) error {
if err := decoder.Decode(app); err != nil {
return err
}
......
package backendplugin
import (
"context"
"errors"
datasourceV1 "github.com/grafana/grafana-plugin-model/go/datasource"
rendererV1 "github.com/grafana/grafana-plugin-model/go/renderer"
backend "github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/infra/log"
plugin "github.com/hashicorp/go-plugin"
)
// BackendPlugin a registered backend plugin.
type BackendPlugin struct {
id string
executablePath string
managed bool
clientFactory func() *plugin.Client
client *plugin.Client
logger log.Logger
startFns PluginStartFuncs
}
func (p *BackendPlugin) start(ctx context.Context) error {
p.client = p.clientFactory()
rpcClient, err := p.client.Client()
if err != nil {
return err
}
var legacyClient *LegacyClient
var client *Client
if p.client.NegotiatedVersion() > 1 {
rawBackend, err := rpcClient.Dispense("backend")
if err != nil {
return err
}
rawTransform, err := rpcClient.Dispense("transform")
if err != nil {
return err
}
client = &Client{}
if rawBackend != nil {
if plugin, ok := rawBackend.(backend.BackendPlugin); ok {
client.BackendPlugin = plugin
}
}
if rawTransform != nil {
if plugin, ok := rawTransform.(backend.TransformPlugin); ok {
client.TransformPlugin = plugin
}
}
} else {
raw, err := rpcClient.Dispense(p.id)
if err != nil {
return err
}
legacyClient = &LegacyClient{}
if plugin, ok := raw.(datasourceV1.DatasourcePlugin); ok {
legacyClient.DatasourcePlugin = plugin
}
if plugin, ok := raw.(rendererV1.RendererPlugin); ok {
legacyClient.RendererPlugin = plugin
}
}
if legacyClient == nil && client == nil {
return errors.New("no compatible plugin implementation found")
}
if legacyClient != nil && p.startFns.OnLegacyStart != nil {
if err := p.startFns.OnLegacyStart(p.id, legacyClient, p.logger); err != nil {
return err
}
}
if client != nil && p.startFns.OnStart != nil {
if err := p.startFns.OnStart(p.id, client, p.logger); err != nil {
return err
}
}
return nil
}
func (p *BackendPlugin) stop() error {
if p.client != nil {
p.client.Kill()
}
return nil
}
......@@ -6,130 +6,58 @@ import (
"sync"
"time"
datasourceV1 "github.com/grafana/grafana-plugin-model/go/datasource"
rendererV1 "github.com/grafana/grafana-plugin-model/go/renderer"
backend "github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/registry"
plugin "github.com/hashicorp/go-plugin"
"golang.org/x/xerrors"
)
var (
pluginsMu sync.RWMutex
plugins = make(map[string]*BackendPlugin)
logger = log.New("plugins.backend")
)
type BackendPlugin struct {
id string
executablePath string
managed bool
clientFactory func() *plugin.Client
client *plugin.Client
logger log.Logger
startFns PluginStartFuncs
supportsMetrics bool
supportsHealth bool
func init() {
registry.Register(&registry.Descriptor{
Name: "BackendPluginManager",
Instance: &manager{},
InitPriority: registry.Low,
})
}
func (p *BackendPlugin) start(ctx context.Context) error {
p.client = p.clientFactory()
rpcClient, err := p.client.Client()
if err != nil {
return err
}
var legacyClient *LegacyClient
var client *Client
if p.client.NegotiatedVersion() > 1 {
rawBackend, err := rpcClient.Dispense("backend")
if err != nil {
return err
}
rawTransform, err := rpcClient.Dispense("transform")
if err != nil {
return err
}
client = &Client{}
if rawBackend != nil {
if plugin, ok := rawBackend.(backend.BackendPlugin); ok {
client.BackendPlugin = plugin
}
}
if rawTransform != nil {
if plugin, ok := rawTransform.(backend.TransformPlugin); ok {
client.TransformPlugin = plugin
}
}
} else {
raw, err := rpcClient.Dispense(p.id)
if err != nil {
return err
}
legacyClient = &LegacyClient{}
if plugin, ok := raw.(datasourceV1.DatasourcePlugin); ok {
legacyClient.DatasourcePlugin = plugin
}
if plugin, ok := raw.(rendererV1.RendererPlugin); ok {
legacyClient.RendererPlugin = plugin
}
}
if legacyClient == nil && client == nil {
return errors.New("no compatible plugin implementation found")
}
if legacyClient != nil && p.startFns.OnLegacyStart != nil {
if err := p.startFns.OnLegacyStart(p.id, legacyClient, p.logger); err != nil {
return err
}
}
if client != nil && p.startFns.OnStart != nil {
if err := p.startFns.OnStart(p.id, client, p.logger); err != nil {
return err
}
}
return nil
// Manager manages backend plugins.
type Manager interface {
// Register registers a backend plugin
Register(descriptor PluginDescriptor) error
// StartPlugin starts a non-managed backend plugin
StartPlugin(ctx context.Context, pluginID string) error
}
func (p *BackendPlugin) stop() error {
if p.client != nil {
p.client.Kill()
}
return nil
type manager struct {
pluginsMu sync.RWMutex
plugins map[string]*BackendPlugin
logger log.Logger
}
func (p *BackendPlugin) collectMetrics(ctx context.Context) {
if !p.supportsMetrics {
return
}
func (m *manager) Init() error {
m.plugins = make(map[string]*BackendPlugin)
m.logger = log.New("plugins.backend")
return nil
}
func (p *BackendPlugin) checkHealth(ctx context.Context) {
if !p.supportsHealth {
return
}
func (m *manager) Run(ctx context.Context) error {
m.start(ctx)
<-ctx.Done()
m.stop()
return ctx.Err()
}
// Register registers a backend plugin
func Register(descriptor PluginDescriptor) error {
logger.Debug("Registering backend plugin", "pluginId", descriptor.pluginID, "executablePath", descriptor.executablePath)
pluginsMu.Lock()
defer pluginsMu.Unlock()
func (m *manager) Register(descriptor PluginDescriptor) error {
m.logger.Debug("Registering backend plugin", "pluginId", descriptor.pluginID, "executablePath", descriptor.executablePath)
m.pluginsMu.Lock()
defer m.pluginsMu.Unlock()
if _, exists := plugins[descriptor.pluginID]; exists {
if _, exists := m.plugins[descriptor.pluginID]; exists {
return errors.New("Backend plugin already registered")
}
pluginLogger := logger.New("pluginId", descriptor.pluginID)
pluginLogger := m.logger.New("pluginId", descriptor.pluginID)
plugin := &BackendPlugin{
id: descriptor.pluginID,
executablePath: descriptor.executablePath,
......@@ -141,16 +69,16 @@ func Register(descriptor PluginDescriptor) error {
logger: pluginLogger,
}
plugins[descriptor.pluginID] = plugin
logger.Debug("Backend plugin registered", "pluginId", descriptor.pluginID, "executablePath", descriptor.executablePath)
m.plugins[descriptor.pluginID] = plugin
m.logger.Debug("Backend plugin registered", "pluginId", descriptor.pluginID, "executablePath", descriptor.executablePath)
return nil
}
// Start starts all managed backend plugins
func Start(ctx context.Context) {
pluginsMu.RLock()
defer pluginsMu.RUnlock()
for _, p := range plugins {
// start starts all managed backend plugins
func (m *manager) start(ctx context.Context) {
m.pluginsMu.RLock()
defer m.pluginsMu.RUnlock()
for _, p := range m.plugins {
if !p.managed {
continue
}
......@@ -162,10 +90,10 @@ func Start(ctx context.Context) {
}
// StartPlugin starts a non-managed backend plugin
func StartPlugin(ctx context.Context, pluginID string) error {
pluginsMu.RLock()
p, registered := plugins[pluginID]
pluginsMu.RUnlock()
func (m *manager) StartPlugin(ctx context.Context, pluginID string) error {
m.pluginsMu.RLock()
p, registered := m.plugins[pluginID]
m.pluginsMu.RUnlock()
if !registered {
return errors.New("Backend plugin not registered")
}
......@@ -177,25 +105,11 @@ func StartPlugin(ctx context.Context, pluginID string) error {
return startPluginAndRestartKilledProcesses(ctx, p)
}
func startPluginAndRestartKilledProcesses(ctx context.Context, p *BackendPlugin) error {
if err := p.start(ctx); err != nil {
return err
}
go func(ctx context.Context, p *BackendPlugin) {
if err := restartKilledProcess(ctx, p); err != nil {
p.logger.Error("Attempt to restart killed plugin process failed", "error", err)
}
}(ctx, p)
return nil
}
// Stop stops all managed backend plugins
func Stop() {
pluginsMu.RLock()
defer pluginsMu.RUnlock()
for _, p := range plugins {
// stop stops all managed backend plugins
func (m *manager) stop() {
m.pluginsMu.RLock()
defer m.pluginsMu.RUnlock()
for _, p := range m.plugins {
go func(p *BackendPlugin) {
p.logger.Debug("Stopping plugin")
if err := p.stop(); err != nil {
......@@ -206,18 +120,18 @@ func Stop() {
}
}
// CollectMetrics collect metrics from backend plugins
func CollectMetrics(ctx context.Context) {
for _, p := range plugins {
p.collectMetrics(ctx)
func startPluginAndRestartKilledProcesses(ctx context.Context, p *BackendPlugin) error {
if err := p.start(ctx); err != nil {
return err
}
}
// CheckHealth checks health of backend plugins
func CheckHealth(ctx context.Context) {
for _, p := range plugins {
p.checkHealth(ctx)
}
go func(ctx context.Context, p *BackendPlugin) {
if err := restartKilledProcess(ctx, p); err != nil {
p.logger.Error("Attempt to restart killed plugin process failed", "error", err)
}
}(ctx, p)
return nil
}
func restartKilledProcess(ctx context.Context, p *BackendPlugin) error {
......
......@@ -34,7 +34,7 @@ type DataSourcePlugin struct {
SDK bool `json:"sdk,omitempty"`
}
func (p *DataSourcePlugin) Load(decoder *json.Decoder, pluginDir string) error {
func (p *DataSourcePlugin) Load(decoder *json.Decoder, pluginDir string, backendPluginManager backendplugin.Manager) error {
if err := decoder.Decode(p); err != nil {
return errutil.Wrapf(err, "Failed to decode datasource plugin")
}
......@@ -50,7 +50,7 @@ func (p *DataSourcePlugin) Load(decoder *json.Decoder, pluginDir string) error {
OnLegacyStart: p.onLegacyPluginStart,
OnStart: p.onPluginStart,
})
if err := backendplugin.Register(descriptor); err != nil {
if err := backendPluginManager.Register(descriptor); err != nil {
return errutil.Wrapf(err, "Failed to register backend plugin")
}
}
......
......@@ -6,6 +6,7 @@ import (
"strings"
m "github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/plugins/backendplugin"
"github.com/grafana/grafana/pkg/setting"
)
......@@ -32,7 +33,7 @@ func (e PluginNotFoundError) Error() string {
}
type PluginLoader interface {
Load(decoder *json.Decoder, pluginDir string) error
Load(decoder *json.Decoder, pluginDir string, backendPluginManager backendplugin.Manager) error
}
type PluginBase struct {
......
package plugins
import "encoding/json"
import (
"encoding/json"
"github.com/grafana/grafana/pkg/plugins/backendplugin"
)
type PanelPlugin struct {
FrontendPluginBase
SkipDataQuery bool `json:"skipDataQuery"`
}
func (p *PanelPlugin) Load(decoder *json.Decoder, pluginDir string) error {
func (p *PanelPlugin) Load(decoder *json.Decoder, pluginDir string, backendPluginManager backendplugin.Manager) error {
if err := decoder.Decode(p); err != nil {
return err
}
......
......@@ -38,12 +38,14 @@ var (
)
type PluginScanner struct {
pluginPath string
errors []error
pluginPath string
errors []error
backendPluginManager backendplugin.Manager
}
type PluginManager struct {
log log.Logger
BackendPluginManager backendplugin.Manager `inject:""`
log log.Logger
}
func init() {
......@@ -112,7 +114,6 @@ func (pm *PluginManager) Init() error {
}
func (pm *PluginManager) Run(ctx context.Context) error {
backendplugin.Start(ctx)
pm.updateAppDashboards()
pm.checkForUpdates()
......@@ -128,8 +129,6 @@ func (pm *PluginManager) Run(ctx context.Context) error {
}
}
backendplugin.Stop()
return ctx.Err()
}
......@@ -156,7 +155,8 @@ func (pm *PluginManager) checkPluginPaths() error {
// scan a directory for plugins.
func (pm *PluginManager) scan(pluginDir string) error {
scanner := &PluginScanner{
pluginPath: pluginDir,
pluginPath: pluginDir,
backendPluginManager: pm.BackendPluginManager,
}
if err := util.Walk(pluginDir, true, true, scanner.walker); err != nil {
......@@ -247,7 +247,7 @@ func (scanner *PluginScanner) loadPluginJson(pluginJsonFilePath string) error {
if _, err := reader.Seek(0, 0); err != nil {
return err
}
return loader.Load(jsonParser, currentDir)
return loader.Load(jsonParser, currentDir, scanner.backendPluginManager)
}
func (scanner *PluginScanner) IsBackendOnlyPlugin(pluginType string) bool {
......
......@@ -14,11 +14,12 @@ import (
type RendererPlugin struct {
PluginBase
Executable string `json:"executable,omitempty"`
GrpcPlugin pluginModel.RendererPlugin
Executable string `json:"executable,omitempty"`
GrpcPlugin pluginModel.RendererPlugin
backendPluginManager backendplugin.Manager
}
func (r *RendererPlugin) Load(decoder *json.Decoder, pluginDir string) error {
func (r *RendererPlugin) Load(decoder *json.Decoder, pluginDir string, backendPluginManager backendplugin.Manager) error {
if err := decoder.Decode(r); err != nil {
return err
}
......@@ -27,12 +28,14 @@ func (r *RendererPlugin) Load(decoder *json.Decoder, pluginDir string) error {
return err
}
r.backendPluginManager = backendPluginManager
cmd := ComposePluginStartCommmand("plugin_start")
fullpath := path.Join(r.PluginDir, cmd)
descriptor := backendplugin.NewRendererPluginDescriptor(r.Id, fullpath, backendplugin.PluginStartFuncs{
OnLegacyStart: r.onLegacyPluginStart,
})
if err := backendplugin.Register(descriptor); err != nil {
if err := backendPluginManager.Register(descriptor); err != nil {
return errutil.Wrapf(err, "Failed to register backend plugin")
}
......@@ -41,7 +44,7 @@ func (r *RendererPlugin) Load(decoder *json.Decoder, pluginDir string) error {
}
func (r *RendererPlugin) Start(ctx context.Context) error {
if err := backendplugin.StartPlugin(ctx, r.Id); err != nil {
if err := r.backendPluginManager.StartPlugin(ctx, r.Id); err != nil {
return errutil.Wrapf(err, "Failed to start renderer plugin")
}
......
......@@ -28,7 +28,7 @@ type TransformPlugin struct {
*TransformWrapper
}
func (p *TransformPlugin) Load(decoder *json.Decoder, pluginDir string) error {
func (p *TransformPlugin) Load(decoder *json.Decoder, pluginDir string, backendPluginManager backendplugin.Manager) error {
if err := decoder.Decode(p); err != nil {
return err
}
......@@ -42,7 +42,7 @@ func (p *TransformPlugin) Load(decoder *json.Decoder, pluginDir string) error {
descriptor := backendplugin.NewBackendPluginDescriptor(p.Id, fullpath, backendplugin.PluginStartFuncs{
OnStart: p.onPluginStart,
})
if err := backendplugin.Register(descriptor); err != nil {
if err := backendPluginManager.Register(descriptor); err != nil {
return errutil.Wrapf(err, "Failed to register backend plugin")
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment