Commit 442e0e43 by Torkel Ödegaard Committed by bergquist

refactoring: transaction manager PR #12203

parent 6775a82c
...@@ -12,8 +12,8 @@ type Msg interface{} ...@@ -12,8 +12,8 @@ type Msg interface{}
var ErrHandlerNotFound = errors.New("handler not found") var ErrHandlerNotFound = errors.New("handler not found")
type TransactionWrapper interface { type TransactionManager interface {
Wrap(ctx context.Context, fn func(ctx context.Context) error) error InTransaction(ctx context.Context, fn func(ctx context.Context) error) error
} }
type Bus interface { type Bus interface {
...@@ -35,19 +35,18 @@ type Bus interface { ...@@ -35,19 +35,18 @@ type Bus interface {
// SetTransactionManager allows the user to replace the internal // SetTransactionManager allows the user to replace the internal
// noop TransactionManager that is responsible for manageing // noop TransactionManager that is responsible for manageing
// transactions in `InTransaction` // transactions in `InTransaction`
SetTransactionManager(tm TransactionWrapper) SetTransactionManager(tm TransactionManager)
} }
func (b *InProcBus) InTransaction(ctx context.Context, fn func(ctx context.Context) error) error { func (b *InProcBus) InTransaction(ctx context.Context, fn func(ctx context.Context) error) error {
return b.transactionWrapper.Wrap(ctx, fn) return b.txMng.InTransaction(ctx, fn)
} }
type InProcBus struct { type InProcBus struct {
handlers map[string]HandlerFunc handlers map[string]HandlerFunc
listeners map[string][]HandlerFunc listeners map[string][]HandlerFunc
wildcardListeners []HandlerFunc wildcardListeners []HandlerFunc
txMng TransactionManager
transactionWrapper TransactionWrapper
} }
// temp stuff, not sure how to handle bus instance, and init yet // temp stuff, not sure how to handle bus instance, and init yet
...@@ -58,8 +57,7 @@ func New() Bus { ...@@ -58,8 +57,7 @@ func New() Bus {
bus.handlers = make(map[string]HandlerFunc) bus.handlers = make(map[string]HandlerFunc)
bus.listeners = make(map[string][]HandlerFunc) bus.listeners = make(map[string][]HandlerFunc)
bus.wildcardListeners = make([]HandlerFunc, 0) bus.wildcardListeners = make([]HandlerFunc, 0)
bus.txMng = &noopTransactionManager{}
bus.transactionWrapper = &noopTransactionManager{}
return bus return bus
} }
...@@ -69,12 +67,8 @@ func GetBus() Bus { ...@@ -69,12 +67,8 @@ func GetBus() Bus {
return globalBus return globalBus
} }
func SetTransactionManager(tm TransactionWrapper) { func (b *InProcBus) SetTransactionManager(tm TransactionManager) {
globalBus.SetTransactionManager(tm) b.txMng = tm
}
func (b *InProcBus) SetTransactionManager(tm TransactionWrapper) {
b.transactionWrapper = tm
} }
func (b *InProcBus) DispatchCtx(ctx context.Context, msg Msg) error { func (b *InProcBus) DispatchCtx(ctx context.Context, msg Msg) error {
...@@ -213,6 +207,6 @@ func ClearBusHandlers() { ...@@ -213,6 +207,6 @@ func ClearBusHandlers() {
type noopTransactionManager struct{} type noopTransactionManager struct{}
func (*noopTransactionManager) Wrap(ctx context.Context, fn func(ctx context.Context) error) error { func (*noopTransactionManager) InTransaction(ctx context.Context, fn func(ctx context.Context) error) error {
return nil return nil
} }
package models
import "context"
type TransactionManager interface {
InTransaction(ctx context.Context, fn func(ctx context.Context) error) error
}
...@@ -3,12 +3,8 @@ package sqlstore ...@@ -3,12 +3,8 @@ package sqlstore
import ( import (
"context" "context"
"reflect" "reflect"
"time"
"github.com/go-xorm/xorm" "github.com/go-xorm/xorm"
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/log"
sqlite3 "github.com/mattn/go-sqlite3"
) )
type DBSession struct { type DBSession struct {
...@@ -26,10 +22,6 @@ func newSession() *DBSession { ...@@ -26,10 +22,6 @@ func newSession() *DBSession {
return &DBSession{Session: x.NewSession()} return &DBSession{Session: x.NewSession()}
} }
func inTransaction(callback dbTransactionFunc) error {
return inTransactionWithRetry(callback, 0)
}
func startSession(ctx context.Context) *DBSession { func startSession(ctx context.Context) *DBSession {
value := ctx.Value(ContextSessionName) value := ctx.Value(ContextSessionName)
var sess *DBSession var sess *DBSession
...@@ -50,50 +42,6 @@ func withDbSession(ctx context.Context, callback dbTransactionFunc) error { ...@@ -50,50 +42,6 @@ func withDbSession(ctx context.Context, callback dbTransactionFunc) error {
return callback(sess) return callback(sess)
} }
func inTransactionWithRetry(callback dbTransactionFunc, retry int) error {
return inTransactionWithRetryCtx(context.Background(), callback, retry)
}
func inTransactionWithRetryCtx(ctx context.Context, callback dbTransactionFunc, retry int) error {
var err error
sess := startSession(ctx)
defer sess.Close()
if err = sess.Begin(); err != nil {
return err
}
err = callback(sess)
// special handling of database locked errors for sqlite, then we can retry 3 times
if sqlError, ok := err.(sqlite3.Error); ok && retry < 5 {
if sqlError.Code == sqlite3.ErrLocked {
sess.Rollback()
time.Sleep(time.Millisecond * time.Duration(10))
sqlog.Info("Database table locked, sleeping then retrying", "retry", retry)
return inTransactionWithRetry(callback, retry+1)
}
}
if err != nil {
sess.Rollback()
return err
} else if err = sess.Commit(); err != nil {
return err
}
if len(sess.events) > 0 {
for _, e := range sess.events {
if err = bus.Publish(e); err != nil {
log.Error(3, "Failed to publish event after commit", err)
}
}
}
return nil
}
func (sess *DBSession) InsertId(bean interface{}) (int64, error) { func (sess *DBSession) InsertId(bean interface{}) (int64, error) {
table := sess.DB().Mapper.Obj2Table(getTypeName(bean)) table := sess.DB().Mapper.Obj2Table(getTypeName(bean))
......
...@@ -23,11 +23,10 @@ import ( ...@@ -23,11 +23,10 @@ import (
"github.com/go-sql-driver/mysql" "github.com/go-sql-driver/mysql"
"github.com/go-xorm/xorm" "github.com/go-xorm/xorm"
_ "github.com/lib/pq"
_ "github.com/mattn/go-sqlite3"
sqlite3 "github.com/mattn/go-sqlite3"
_ "github.com/grafana/grafana/pkg/tsdb/mssql" _ "github.com/grafana/grafana/pkg/tsdb/mssql"
_ "github.com/lib/pq"
_ "github.com/mattn/go-sqlite3"
) )
var ( var (
...@@ -82,9 +81,7 @@ func (ss *SqlStore) Init() error { ...@@ -82,9 +81,7 @@ func (ss *SqlStore) Init() error {
// Init repo instances // Init repo instances
annotations.SetRepository(&SqlAnnotationRepo{}) annotations.SetRepository(&SqlAnnotationRepo{})
ss.Bus.SetTransactionManager(&SQLTransactionManager{ ss.Bus.SetTransactionManager(ss)
engine: ss.engine,
})
// ensure admin user // ensure admin user
if ss.skipEnsureAdmin { if ss.skipEnsureAdmin {
...@@ -94,57 +91,10 @@ func (ss *SqlStore) Init() error { ...@@ -94,57 +91,10 @@ func (ss *SqlStore) Init() error {
return ss.ensureAdminUser() return ss.ensureAdminUser()
} }
// SQLTransactionManager begin/end transaction
type SQLTransactionManager struct {
engine *xorm.Engine
}
func (stm *SQLTransactionManager) Wrap(ctx context.Context, fn func(ctx context.Context) error) error {
return stm.wrapInternal(ctx, fn, 0)
}
func (stm *SQLTransactionManager) wrapInternal(ctx context.Context, fn func(ctx context.Context) error, retry int) error {
sess := startSession(ctx)
defer sess.Close()
withValue := context.WithValue(ctx, ContextSessionName, sess)
err := fn(withValue)
// special handling of database locked errors for sqlite, then we can retry 3 times
if sqlError, ok := err.(sqlite3.Error); ok && retry < 5 {
if sqlError.Code == sqlite3.ErrLocked {
sess.Rollback()
time.Sleep(time.Millisecond * time.Duration(10))
sqlog.Info("Database table locked, sleeping then retrying", "retry", retry)
return stm.wrapInternal(ctx, fn, retry+1)
}
}
if err != nil {
sess.Rollback()
return err
}
if err = sess.Commit(); err != nil {
return err
}
if len(sess.events) > 0 {
for _, e := range sess.events {
if err = bus.Publish(e); err != nil {
log.Error(3, "Failed to publish event after commit", err)
}
}
}
return nil
}
func (ss *SqlStore) ensureAdminUser() error { func (ss *SqlStore) ensureAdminUser() error {
systemUserCountQuery := m.GetSystemUserCountStatsQuery{} systemUserCountQuery := m.GetSystemUserCountStatsQuery{}
err := bus.InTransaction(context.Background(), func(ctx context.Context) error { err := ss.InTransaction(context.Background(), func(ctx context.Context) error {
err := bus.DispatchCtx(ctx, &systemUserCountQuery) err := bus.DispatchCtx(ctx, &systemUserCountQuery)
if err != nil { if err != nil {
......
package sqlstore
import (
"context"
"time"
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/log"
sqlite3 "github.com/mattn/go-sqlite3"
)
func (ss *SqlStore) InTransaction(ctx context.Context, fn func(ctx context.Context) error) error {
return ss.inTransactionWithRetry(ctx, fn, 0)
}
func (ss *SqlStore) inTransactionWithRetry(ctx context.Context, fn func(ctx context.Context) error, retry int) error {
sess := startSession(ctx)
defer sess.Close()
withValue := context.WithValue(ctx, ContextSessionName, sess)
err := fn(withValue)
// special handling of database locked errors for sqlite, then we can retry 3 times
if sqlError, ok := err.(sqlite3.Error); ok && retry < 5 {
if sqlError.Code == sqlite3.ErrLocked {
sess.Rollback()
time.Sleep(time.Millisecond * time.Duration(10))
ss.log.Info("Database table locked, sleeping then retrying", "retry", retry)
return ss.inTransactionWithRetry(ctx, fn, retry+1)
}
}
if err != nil {
sess.Rollback()
return err
}
if err = sess.Commit(); err != nil {
return err
}
if len(sess.events) > 0 {
for _, e := range sess.events {
if err = bus.Publish(e); err != nil {
ss.log.Error("Failed to publish event after commit", err)
}
}
}
return nil
}
func inTransactionWithRetry(callback dbTransactionFunc, retry int) error {
return inTransactionWithRetryCtx(context.Background(), callback, retry)
}
func inTransactionWithRetryCtx(ctx context.Context, callback dbTransactionFunc, retry int) error {
var err error
sess := startSession(ctx)
defer sess.Close()
if err = sess.Begin(); err != nil {
return err
}
err = callback(sess)
// special handling of database locked errors for sqlite, then we can retry 3 times
if sqlError, ok := err.(sqlite3.Error); ok && retry < 5 {
if sqlError.Code == sqlite3.ErrLocked {
sess.Rollback()
time.Sleep(time.Millisecond * time.Duration(10))
sqlog.Info("Database table locked, sleeping then retrying", "retry", retry)
return inTransactionWithRetry(callback, retry+1)
}
}
if err != nil {
sess.Rollback()
return err
} else if err = sess.Commit(); err != nil {
return err
}
if len(sess.events) > 0 {
for _, e := range sess.events {
if err = bus.Publish(e); err != nil {
log.Error(3, "Failed to publish event after commit", err)
}
}
}
return nil
}
func inTransaction(callback dbTransactionFunc) error {
return inTransactionWithRetry(callback, 0)
}
...@@ -31,7 +31,6 @@ func init() { ...@@ -31,7 +31,6 @@ func init() {
bus.AddHandler("sql", DeleteUser) bus.AddHandler("sql", DeleteUser)
bus.AddHandler("sql", UpdateUserPermissions) bus.AddHandler("sql", UpdateUserPermissions)
bus.AddHandler("sql", SetUserHelpFlag) bus.AddHandler("sql", SetUserHelpFlag)
bus.AddCtxHandler("sql", CreateUserCtx) bus.AddCtxHandler("sql", CreateUserCtx)
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment