Commit 7653d8a1 by Carl Bergquist Committed by GitHub

Merge pull request #14468 from bergquist/db_lock

Infra package for creating distributed lock to make sure functions are executed once even in HA mode. 
parents c0fc236f 7aa84aeb
...@@ -28,6 +28,7 @@ import ( ...@@ -28,6 +28,7 @@ import (
// self registering services // self registering services
_ "github.com/grafana/grafana/pkg/extensions" _ "github.com/grafana/grafana/pkg/extensions"
_ "github.com/grafana/grafana/pkg/infra/serverlock"
_ "github.com/grafana/grafana/pkg/metrics" _ "github.com/grafana/grafana/pkg/metrics"
_ "github.com/grafana/grafana/pkg/plugins" _ "github.com/grafana/grafana/pkg/plugins"
_ "github.com/grafana/grafana/pkg/services/alerting" _ "github.com/grafana/grafana/pkg/services/alerting"
......
package serverlock
type serverLock struct {
Id int64
OperationUid string
LastExecution int64
Version int64
}
package serverlock
import (
"context"
"time"
"github.com/grafana/grafana/pkg/log"
"github.com/grafana/grafana/pkg/registry"
"github.com/grafana/grafana/pkg/services/sqlstore"
)
func init() {
registry.RegisterService(&ServerLockService{})
}
// ServerLockService allows servers in HA mode to claim a lock
// and execute an function if the server was granted the lock
type ServerLockService struct {
SQLStore *sqlstore.SqlStore `inject:""`
log log.Logger
}
// Init this service
func (sl *ServerLockService) Init() error {
sl.log = log.New("infra.lockservice")
return nil
}
// LockAndExecute try to create a lock for this server and only executes the
// `fn` function when successful. This should not be used at low internal. But services
// that needs to be run once every ex 10m.
func (sl *ServerLockService) LockAndExecute(ctx context.Context, actionName string, maxInterval time.Duration, fn func()) error {
// gets or creates a lockable row
rowLock, err := sl.getOrCreate(ctx, actionName)
if err != nil {
return err
}
// avoid execution if last lock happened less than `maxInterval` ago
if rowLock.LastExecution != 0 {
lastExeuctionTime := time.Unix(rowLock.LastExecution, 0)
if lastExeuctionTime.Unix() > time.Now().Add(-maxInterval).Unix() {
return nil
}
}
// try to get lock based on rowLow version
acquiredLock, err := sl.acquireLock(ctx, rowLock)
if err != nil {
return err
}
if acquiredLock {
fn()
}
return nil
}
func (sl *ServerLockService) acquireLock(ctx context.Context, serverLock *serverLock) (bool, error) {
var result bool
err := sl.SQLStore.WithDbSession(ctx, func(dbSession *sqlstore.DBSession) error {
newVersion := serverLock.Version + 1
sql := `UPDATE server_lock SET
version = ?,
last_execution = ?
WHERE
id = ? AND version = ?`
res, err := dbSession.Exec(sql, newVersion, time.Now().Unix(), serverLock.Id, serverLock.Version)
if err != nil {
return err
}
affected, err := res.RowsAffected()
result = affected == 1
return err
})
return result, err
}
func (sl *ServerLockService) getOrCreate(ctx context.Context, actionName string) (*serverLock, error) {
var result *serverLock
err := sl.SQLStore.WithTransactionalDbSession(ctx, func(dbSession *sqlstore.DBSession) error {
lockRows := []*serverLock{}
err := dbSession.Where("operation_uid = ?", actionName).Find(&lockRows)
if err != nil {
return err
}
if len(lockRows) > 0 {
result = lockRows[0]
return nil
}
lockRow := &serverLock{
OperationUid: actionName,
LastExecution: 0,
}
_, err = dbSession.Insert(lockRow)
if err != nil {
return err
}
result = lockRow
return nil
})
return result, err
}
// +build integration
package serverlock
import (
"context"
"testing"
"time"
. "github.com/smartystreets/goconvey/convey"
)
func TestServerLok(t *testing.T) {
sl := createTestableServerLock(t)
Convey("Server lock integration tests", t, func() {
counter := 0
var err error
incCounter := func() { counter++ }
atInterval := time.Second * 1
ctx := context.Background()
//this time `fn` should be executed
So(sl.LockAndExecute(ctx, "test-operation", atInterval, incCounter), ShouldBeNil)
//this should not execute `fn`
So(sl.LockAndExecute(ctx, "test-operation", atInterval, incCounter), ShouldBeNil)
So(sl.LockAndExecute(ctx, "test-operation", atInterval, incCounter), ShouldBeNil)
So(sl.LockAndExecute(ctx, "test-operation", atInterval, incCounter), ShouldBeNil)
So(sl.LockAndExecute(ctx, "test-operation", atInterval, incCounter), ShouldBeNil)
// wait 5 second.
<-time.After(atInterval * 2)
// now `fn` should be executed again
err = sl.LockAndExecute(ctx, "test-operation", atInterval, incCounter)
So(err, ShouldBeNil)
So(counter, ShouldEqual, 2)
})
}
package serverlock
import (
"context"
"testing"
"github.com/grafana/grafana/pkg/log"
"github.com/grafana/grafana/pkg/services/sqlstore"
. "github.com/smartystreets/goconvey/convey"
)
func createTestableServerLock(t *testing.T) *ServerLockService {
t.Helper()
sqlstore := sqlstore.InitTestDB(t)
return &ServerLockService{
SQLStore: sqlstore,
log: log.New("test-logger"),
}
}
func TestServerLock(t *testing.T) {
Convey("Server lock", t, func() {
sl := createTestableServerLock(t)
operationUID := "test-operation"
first, err := sl.getOrCreate(context.Background(), operationUID)
So(err, ShouldBeNil)
lastExecution := first.LastExecution
Convey("trying to create three new row locks", func() {
for i := 0; i < 3; i++ {
first, err = sl.getOrCreate(context.Background(), operationUID)
So(err, ShouldBeNil)
So(first.OperationUid, ShouldEqual, operationUID)
So(first.Id, ShouldEqual, 1)
}
Convey("Should not create new since lock already exist", func() {
So(lastExecution, ShouldEqual, first.LastExecution)
})
})
Convey("Should be able to create lock on first row", func() {
gotLock, err := sl.acquireLock(context.Background(), first)
So(err, ShouldBeNil)
So(gotLock, ShouldBeTrue)
gotLock, err = sl.acquireLock(context.Background(), first)
So(err, ShouldBeNil)
So(gotLock, ShouldBeFalse)
})
})
}
...@@ -8,6 +8,7 @@ import ( ...@@ -8,6 +8,7 @@ import (
"time" "time"
"github.com/grafana/grafana/pkg/bus" "github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/infra/serverlock"
"github.com/grafana/grafana/pkg/log" "github.com/grafana/grafana/pkg/log"
m "github.com/grafana/grafana/pkg/models" m "github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/registry" "github.com/grafana/grafana/pkg/registry"
...@@ -17,6 +18,7 @@ import ( ...@@ -17,6 +18,7 @@ import (
type CleanUpService struct { type CleanUpService struct {
log log.Logger log log.Logger
Cfg *setting.Cfg `inject:""` Cfg *setting.Cfg `inject:""`
ServerLockService *serverlock.ServerLockService `inject:""`
} }
func init() { func init() {
...@@ -38,7 +40,10 @@ func (srv *CleanUpService) Run(ctx context.Context) error { ...@@ -38,7 +40,10 @@ func (srv *CleanUpService) Run(ctx context.Context) error {
srv.cleanUpTmpFiles() srv.cleanUpTmpFiles()
srv.deleteExpiredSnapshots() srv.deleteExpiredSnapshots()
srv.deleteExpiredDashboardVersions() srv.deleteExpiredDashboardVersions()
srv.ServerLockService.LockAndExecute(ctx, "delete old login attempts", time.Minute*10, func() {
srv.deleteOldLoginAttempts() srv.deleteOldLoginAttempts()
})
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
} }
......
...@@ -31,6 +31,7 @@ func AddMigrations(mg *Migrator) { ...@@ -31,6 +31,7 @@ func AddMigrations(mg *Migrator) {
addTagMigration(mg) addTagMigration(mg)
addLoginAttemptMigrations(mg) addLoginAttemptMigrations(mg)
addUserAuthMigrations(mg) addUserAuthMigrations(mg)
addServerlockMigrations(mg)
} }
func addMigrationLogMigrations(mg *Migrator) { func addMigrationLogMigrations(mg *Migrator) {
......
package migrations
import "github.com/grafana/grafana/pkg/services/sqlstore/migrator"
func addServerlockMigrations(mg *migrator.Migrator) {
serverLock := migrator.Table{
Name: "server_lock",
Columns: []*migrator.Column{
{Name: "id", Type: migrator.DB_BigInt, IsPrimaryKey: true, IsAutoIncrement: true},
{Name: "operation_uid", Type: migrator.DB_NVarchar, Length: 100},
{Name: "version", Type: migrator.DB_BigInt},
{Name: "last_execution", Type: migrator.DB_BigInt, Nullable: false},
},
Indices: []*migrator.Index{
{Cols: []string{"operation_uid"}, Type: migrator.UniqueIndex},
},
}
mg.AddMigration("create server_lock table", migrator.NewAddTableMigration(serverLock))
mg.AddMigration("add index server_lock.operation_uid", migrator.NewAddIndexMigration(serverLock, serverLock.Indices[0]))
}
...@@ -19,5 +19,5 @@ exit_if_fail time go install ./pkg/cmd/grafana-server ...@@ -19,5 +19,5 @@ exit_if_fail time go install ./pkg/cmd/grafana-server
echo "running go test" echo "running go test"
set -e set -e
time for d in $(go list ./pkg/...); do time for d in $(go list ./pkg/...); do
exit_if_fail go test -covermode=atomic $d exit_if_fail go test -tags=integration -covermode=atomic $d
done done
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment