Commit 42b745a0 by Andrej Ocenas Committed by GitHub

Provisioning: Add API endpoint to reload provisioning configs (#16579)

* Add api to reaload provisioning

* Refactor and simplify the polling code

* Add test for the provisioning service

* Fix provider initialization and move some code to file reader

* Simplify the code and move initialization

* Remove unused code

* Update comment

* Add comment

* Change error messages

* Add DashboardProvisionerFactory type

* Update imports

* Use new assert lib

* Use mutext for synchronizing the reloading

* Fix typo

Co-Authored-By: aocenas <mr.ocenas@gmail.com>

* Add docs about the new api
parent b3bfbc6f
...@@ -447,3 +447,36 @@ Content-Type: application/json ...@@ -447,3 +447,36 @@ Content-Type: application/json
"message": "User auth token revoked" "message": "User auth token revoked"
} }
``` ```
## Reload provisioning configurations
`POST /api/admin/provisioning/dashboards/reload`
`POST /api/admin/provisioning/datasources/reload`
`POST /api/admin/provisioning/notifications/reload`
Reloads the provisioning config files for specified type and provision entities again. It won't return
until the new provisioned entities are already stored in the database. In case of dashboards, it will stop
polling for changes in dashboard files and then restart it with new configs after returning.
Only works with Basic Authentication (username and password). See [introduction](http://docs.grafana.org/http_api/admin/#admin-api) for an explanation.
**Example Request**:
```http
POST /api/admin/provisioning/dashboards/reload HTTP/1.1
Accept: application/json
Content-Type: application/json
```
**Example Response**:
```http
HTTP/1.1 200
Content-Type: application/json
{
"message": "Dashboards config reloaded"
}
```
package api
import (
"context"
"github.com/grafana/grafana/pkg/models"
)
func (server *HTTPServer) AdminProvisioningReloadDasboards(c *models.ReqContext) Response {
err := server.ProvisioningService.ProvisionDashboards()
if err != nil && err != context.Canceled {
return Error(500, "", err)
}
return Success("Dashboards config reloaded")
}
func (server *HTTPServer) AdminProvisioningReloadDatasources(c *models.ReqContext) Response {
err := server.ProvisioningService.ProvisionDatasources()
if err != nil {
return Error(500, "", err)
}
return Success("Datasources config reloaded")
}
func (server *HTTPServer) AdminProvisioningReloadNotifications(c *models.ReqContext) Response {
err := server.ProvisioningService.ProvisionNotifications()
if err != nil {
return Error(500, "", err)
}
return Success("Notifications config reloaded")
}
...@@ -387,6 +387,10 @@ func (hs *HTTPServer) registerRoutes() { ...@@ -387,6 +387,10 @@ func (hs *HTTPServer) registerRoutes() {
adminRoute.Post("/users/:id/logout", Wrap(hs.AdminLogoutUser)) adminRoute.Post("/users/:id/logout", Wrap(hs.AdminLogoutUser))
adminRoute.Get("/users/:id/auth-tokens", Wrap(hs.AdminGetUserAuthTokens)) adminRoute.Get("/users/:id/auth-tokens", Wrap(hs.AdminGetUserAuthTokens))
adminRoute.Post("/users/:id/revoke-auth-token", bind(m.RevokeAuthTokenCmd{}), Wrap(hs.AdminRevokeUserAuthToken)) adminRoute.Post("/users/:id/revoke-auth-token", bind(m.RevokeAuthTokenCmd{}), Wrap(hs.AdminRevokeUserAuthToken))
adminRoute.Post("/provisioning/dashboards/reload", Wrap(hs.AdminProvisioningReloadDasboards))
adminRoute.Post("/provisioning/datasources/reload", Wrap(hs.AdminProvisioningReloadDatasources))
adminRoute.Post("/provisioning/notifications/reload", Wrap(hs.AdminProvisioningReloadNotifications))
}, reqGrafanaAdmin) }, reqGrafanaAdmin)
// rendering // rendering
......
...@@ -25,6 +25,7 @@ import ( ...@@ -25,6 +25,7 @@ import (
"github.com/grafana/grafana/pkg/services/cache" "github.com/grafana/grafana/pkg/services/cache"
"github.com/grafana/grafana/pkg/services/datasources" "github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/services/hooks" "github.com/grafana/grafana/pkg/services/hooks"
"github.com/grafana/grafana/pkg/services/provisioning"
"github.com/grafana/grafana/pkg/services/quota" "github.com/grafana/grafana/pkg/services/quota"
"github.com/grafana/grafana/pkg/services/rendering" "github.com/grafana/grafana/pkg/services/rendering"
"github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/setting"
...@@ -48,16 +49,17 @@ type HTTPServer struct { ...@@ -48,16 +49,17 @@ type HTTPServer struct {
streamManager *live.StreamManager streamManager *live.StreamManager
httpSrv *http.Server httpSrv *http.Server
RouteRegister routing.RouteRegister `inject:""` RouteRegister routing.RouteRegister `inject:""`
Bus bus.Bus `inject:""` Bus bus.Bus `inject:""`
RenderService rendering.Service `inject:""` RenderService rendering.Service `inject:""`
Cfg *setting.Cfg `inject:""` Cfg *setting.Cfg `inject:""`
HooksService *hooks.HooksService `inject:""` HooksService *hooks.HooksService `inject:""`
CacheService *cache.CacheService `inject:""` CacheService *cache.CacheService `inject:""`
DatasourceCache datasources.CacheService `inject:""` DatasourceCache datasources.CacheService `inject:""`
AuthTokenService models.UserTokenService `inject:""` AuthTokenService models.UserTokenService `inject:""`
QuotaService *quota.QuotaService `inject:""` QuotaService *quota.QuotaService `inject:""`
RemoteCacheService *remotecache.RemoteCache `inject:""` RemoteCacheService *remotecache.RemoteCache `inject:""`
ProvisioningService provisioning.ProvisioningService `inject:""`
} }
func (hs *HTTPServer) Init() error { func (hs *HTTPServer) Init() error {
......
...@@ -3,44 +3,79 @@ package dashboards ...@@ -3,44 +3,79 @@ package dashboards
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/grafana/grafana/pkg/log" "github.com/grafana/grafana/pkg/log"
"github.com/pkg/errors"
) )
type DashboardProvisioner struct { type DashboardProvisioner interface {
cfgReader *configReader Provision() error
log log.Logger PollChanges(ctx context.Context)
}
type DashboardProvisionerImpl struct {
log log.Logger
fileReaders []*fileReader
} }
func NewDashboardProvisioner(configDirectory string) *DashboardProvisioner { type DashboardProvisionerFactory func(string) (DashboardProvisioner, error)
log := log.New("provisioning.dashboard")
d := &DashboardProvisioner{ func NewDashboardProvisionerImpl(configDirectory string) (*DashboardProvisionerImpl, error) {
cfgReader: &configReader{path: configDirectory, log: log}, logger := log.New("provisioning.dashboard")
log: log, cfgReader := &configReader{path: configDirectory, log: logger}
configs, err := cfgReader.readConfig()
if err != nil {
return nil, errors.Wrap(err, "Failed to read dashboards config")
} }
return d fileReaders, err := getFileReaders(configs, logger)
}
func (provider *DashboardProvisioner) Provision(ctx context.Context) error {
cfgs, err := provider.cfgReader.readConfig()
if err != nil { if err != nil {
return err return nil, errors.Wrap(err, "Failed to initialize file readers")
}
d := &DashboardProvisionerImpl{
log: logger,
fileReaders: fileReaders,
} }
for _, cfg := range cfgs { return d, nil
switch cfg.Type { }
func (provider *DashboardProvisionerImpl) Provision() error {
for _, reader := range provider.fileReaders {
err := reader.startWalkingDisk()
if err != nil {
return errors.Wrapf(err, "Failed to provision config %v", reader.Cfg.Name)
}
}
return nil
}
// PollChanges starts polling for changes in dashboard definition files. It creates goroutine for each provider
// defined in the config.
func (provider *DashboardProvisionerImpl) PollChanges(ctx context.Context) {
for _, reader := range provider.fileReaders {
go reader.pollChanges(ctx)
}
}
func getFileReaders(configs []*DashboardsAsConfig, logger log.Logger) ([]*fileReader, error) {
var readers []*fileReader
for _, config := range configs {
switch config.Type {
case "file": case "file":
fileReader, err := NewDashboardFileReader(cfg, provider.log.New("type", cfg.Type, "name", cfg.Name)) fileReader, err := NewDashboardFileReader(config, logger.New("type", config.Type, "name", config.Name))
if err != nil { if err != nil {
return err return nil, errors.Wrapf(err, "Failed to create file reader for config %v", config.Name)
} }
readers = append(readers, fileReader)
go fileReader.ReadAndListen(ctx)
default: default:
return fmt.Errorf("type %s is not supported", cfg.Type) return nil, fmt.Errorf("type %s is not supported", config.Type)
} }
} }
return nil return readers, nil
} }
package dashboards
import "context"
type Calls struct {
Provision []interface{}
PollChanges []interface{}
}
type DashboardProvisionerMock struct {
Calls *Calls
ProvisionFunc func() error
PollChangesFunc func(ctx context.Context)
}
func NewDashboardProvisionerMock() *DashboardProvisionerMock {
return &DashboardProvisionerMock{
Calls: &Calls{},
}
}
func (dpm *DashboardProvisionerMock) Provision() error {
dpm.Calls.Provision = append(dpm.Calls.Provision, nil)
if dpm.ProvisionFunc != nil {
return dpm.ProvisionFunc()
} else {
return nil
}
}
func (dpm *DashboardProvisionerMock) PollChanges(ctx context.Context) {
dpm.Calls.PollChanges = append(dpm.Calls.PollChanges, ctx)
if dpm.PollChangesFunc != nil {
dpm.PollChangesFunc(ctx)
}
}
...@@ -51,35 +51,25 @@ func NewDashboardFileReader(cfg *DashboardsAsConfig, log log.Logger) (*fileReade ...@@ -51,35 +51,25 @@ func NewDashboardFileReader(cfg *DashboardsAsConfig, log log.Logger) (*fileReade
}, nil }, nil
} }
func (fr *fileReader) ReadAndListen(ctx context.Context) error { // pollChanges periodically runs startWalkingDisk based on interval specified in the config.
if err := fr.startWalkingDisk(); err != nil { func (fr *fileReader) pollChanges(ctx context.Context) {
fr.log.Error("failed to search for dashboards", "error", err) ticker := time.Tick(time.Duration(int64(time.Second) * fr.Cfg.UpdateIntervalSeconds))
}
ticker := time.NewTicker(time.Duration(int64(time.Second) * fr.Cfg.UpdateIntervalSeconds))
running := false
for { for {
select { select {
case <-ticker.C: case <-ticker:
if !running { // avoid walking the filesystem in parallel. in-case fs is very slow. if err := fr.startWalkingDisk(); err != nil {
running = true fr.log.Error("failed to search for dashboards", "error", err)
go func() {
if err := fr.startWalkingDisk(); err != nil {
fr.log.Error("failed to search for dashboards", "error", err)
}
running = false
}()
} }
case <-ctx.Done(): case <-ctx.Done():
return nil return
} }
} }
} }
// startWalkingDisk finds and saves dashboards on disk. // startWalkingDisk traverses the file system for defined path, reads dashboard definition files and applies any change
// to the database.
func (fr *fileReader) startWalkingDisk() error { func (fr *fileReader) startWalkingDisk() error {
fr.log.Debug("Start walking disk", "path", fr.Path)
resolvedPath := fr.resolvePath(fr.Path) resolvedPath := fr.resolvePath(fr.Path)
if _, err := os.Stat(resolvedPath); err != nil { if _, err := os.Stat(resolvedPath); err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {
......
...@@ -2,8 +2,10 @@ package provisioning ...@@ -2,8 +2,10 @@ package provisioning
import ( import (
"context" "context"
"fmt" "github.com/grafana/grafana/pkg/log"
"github.com/pkg/errors"
"path" "path"
"sync"
"github.com/grafana/grafana/pkg/registry" "github.com/grafana/grafana/pkg/registry"
"github.com/grafana/grafana/pkg/services/provisioning/dashboards" "github.com/grafana/grafana/pkg/services/provisioning/dashboards"
...@@ -13,35 +15,122 @@ import ( ...@@ -13,35 +15,122 @@ import (
) )
func init() { func init() {
registry.RegisterService(&ProvisioningService{}) registry.RegisterService(NewProvisioningServiceImpl(
func(path string) (dashboards.DashboardProvisioner, error) {
return dashboards.NewDashboardProvisionerImpl(path)
},
notifiers.Provision,
datasources.Provision,
))
} }
type ProvisioningService struct { type ProvisioningService interface {
Cfg *setting.Cfg `inject:""` ProvisionDatasources() error
ProvisionNotifications() error
ProvisionDashboards() error
} }
func (ps *ProvisioningService) Init() error { func NewProvisioningServiceImpl(
datasourcePath := path.Join(ps.Cfg.ProvisioningPath, "datasources") newDashboardProvisioner dashboards.DashboardProvisionerFactory,
if err := datasources.Provision(datasourcePath); err != nil { provisionNotifiers func(string) error,
return fmt.Errorf("Datasource provisioning error: %v", err) provisionDatasources func(string) error,
) *provisioningServiceImpl {
return &provisioningServiceImpl{
log: log.New("provisioning"),
newDashboardProvisioner: newDashboardProvisioner,
provisionNotifiers: provisionNotifiers,
provisionDatasources: provisionDatasources,
} }
}
alertNotificationsPath := path.Join(ps.Cfg.ProvisioningPath, "notifiers") type provisioningServiceImpl struct {
if err := notifiers.Provision(alertNotificationsPath); err != nil { Cfg *setting.Cfg `inject:""`
return fmt.Errorf("Alert notification provisioning error: %v", err) log log.Logger
pollingCtxCancel context.CancelFunc
newDashboardProvisioner dashboards.DashboardProvisionerFactory
dashboardProvisioner dashboards.DashboardProvisioner
provisionNotifiers func(string) error
provisionDatasources func(string) error
mutex sync.Mutex
}
func (ps *provisioningServiceImpl) Init() error {
err := ps.ProvisionDatasources()
if err != nil {
return err
}
err = ps.ProvisionNotifications()
if err != nil {
return err
}
err = ps.ProvisionDashboards()
if err != nil {
return err
} }
return nil return nil
} }
func (ps *ProvisioningService) Run(ctx context.Context) error { func (ps *provisioningServiceImpl) Run(ctx context.Context) error {
for {
// Wait for unlock. This is tied to new dashboardProvisioner to be instantiated before we start polling.
ps.mutex.Lock()
pollingContext, cancelFun := context.WithCancel(ctx)
ps.pollingCtxCancel = cancelFun
ps.dashboardProvisioner.PollChanges(pollingContext)
ps.mutex.Unlock()
select {
case <-pollingContext.Done():
// Polling was canceled.
continue
case <-ctx.Done():
// Root server context was cancelled so just leave.
return ctx.Err()
}
}
}
func (ps *provisioningServiceImpl) ProvisionDatasources() error {
datasourcePath := path.Join(ps.Cfg.ProvisioningPath, "datasources")
err := ps.provisionDatasources(datasourcePath)
return errors.Wrap(err, "Datasource provisioning error")
}
func (ps *provisioningServiceImpl) ProvisionNotifications() error {
alertNotificationsPath := path.Join(ps.Cfg.ProvisioningPath, "notifiers")
err := ps.provisionNotifiers(alertNotificationsPath)
return errors.Wrap(err, "Alert notification provisioning error")
}
func (ps *provisioningServiceImpl) ProvisionDashboards() error {
dashboardPath := path.Join(ps.Cfg.ProvisioningPath, "dashboards") dashboardPath := path.Join(ps.Cfg.ProvisioningPath, "dashboards")
dashProvisioner := dashboards.NewDashboardProvisioner(dashboardPath) dashProvisioner, err := ps.newDashboardProvisioner(dashboardPath)
if err != nil {
return errors.Wrap(err, "Failed to create provisioner")
}
if err := dashProvisioner.Provision(ctx); err != nil { ps.mutex.Lock()
return err defer ps.mutex.Unlock()
ps.cancelPolling()
if err := dashProvisioner.Provision(); err != nil {
// If we fail to provision with the new provisioner, mutex will unlock and the polling we restart with the
// old provisioner as we did not switch them yet.
return errors.Wrap(err, "Failed to provision dashboards")
} }
ps.dashboardProvisioner = dashProvisioner
return nil
}
<-ctx.Done() func (ps *provisioningServiceImpl) cancelPolling() {
return ctx.Err() if ps.pollingCtxCancel != nil {
ps.log.Debug("Stop polling for dashboard changes")
ps.pollingCtxCancel()
}
ps.pollingCtxCancel = nil
} }
package provisioning
import (
"context"
"errors"
"github.com/grafana/grafana/pkg/services/provisioning/dashboards"
"github.com/grafana/grafana/pkg/setting"
"github.com/stretchr/testify/assert"
"testing"
"time"
)
func TestProvisioningServiceImpl(t *testing.T) {
t.Run("Restart dashboard provisioning and stop service", func(t *testing.T) {
service, mock := setup()
ctx, cancel := context.WithCancel(context.Background())
var serviceRunning bool
var serviceError error
err := service.ProvisionDashboards()
assert.Nil(t, err)
go func() {
serviceRunning = true
serviceError = service.Run(ctx)
serviceRunning = false
}()
time.Sleep(time.Millisecond)
assert.Equal(t, 1, len(mock.Calls.PollChanges), "PollChanges should have been called")
err = service.ProvisionDashboards()
assert.Nil(t, err)
time.Sleep(time.Millisecond)
assert.Equal(t, 2, len(mock.Calls.PollChanges), "PollChanges should have been called 2 times")
pollingCtx := mock.Calls.PollChanges[0].(context.Context)
assert.Equal(t, context.Canceled, pollingCtx.Err(), "Polling context from first call should have been cancelled")
assert.True(t, serviceRunning, "Service should be still running")
// Cancelling the root context and stopping the service
cancel()
time.Sleep(time.Millisecond)
assert.False(t, serviceRunning, "Service should not be running")
assert.Equal(t, context.Canceled, serviceError, "Service should have returned canceled error")
})
t.Run("Failed reloading does not stop polling with old provisioned", func(t *testing.T) {
service, mock := setup()
ctx, cancel := context.WithCancel(context.Background())
var serviceRunning bool
err := service.ProvisionDashboards()
assert.Nil(t, err)
go func() {
serviceRunning = true
_ = service.Run(ctx)
serviceRunning = false
}()
time.Sleep(time.Millisecond)
assert.Equal(t, 1, len(mock.Calls.PollChanges), "PollChanges should have been called")
mock.ProvisionFunc = func() error {
return errors.New("Test error")
}
err = service.ProvisionDashboards()
assert.NotNil(t, err)
time.Sleep(time.Millisecond)
// This should have been called with the old provisioner, after the last one failed.
assert.Equal(t, 2, len(mock.Calls.PollChanges), "PollChanges should have been called 2 times")
assert.True(t, serviceRunning, "Service should be still running")
// Cancelling the root context and stopping the service
cancel()
})
}
func setup() (*provisioningServiceImpl, *dashboards.DashboardProvisionerMock) {
dashMock := dashboards.NewDashboardProvisionerMock()
service := NewProvisioningServiceImpl(
func(path string) (dashboards.DashboardProvisioner, error) {
return dashMock, nil
},
nil,
nil,
)
service.Cfg = setting.NewCfg()
return service, dashMock
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment