Commit 26357281 by bergquist

replace begin/end with wrapper function

parent 81436100
...@@ -12,9 +12,8 @@ type Msg interface{} ...@@ -12,9 +12,8 @@ type Msg interface{}
var ErrHandlerNotFound = errors.New("handler not found") var ErrHandlerNotFound = errors.New("handler not found")
type TransactionManager interface { type TransactionWrapper interface {
Begin(ctx context.Context) (context.Context, error) Wrapp(ctx context.Context, fn func(ctx context.Context) error) error
End(ctx context.Context, err error) error
} }
type Bus interface { type Bus interface {
...@@ -25,7 +24,7 @@ type Bus interface { ...@@ -25,7 +24,7 @@ type Bus interface {
// InTransaction starts a transaction and store it in the context. // InTransaction starts a transaction and store it in the context.
// The caller can then pass a function with multiple DispatchCtx calls that // The caller can then pass a function with multiple DispatchCtx calls that
// all will be executed in the same transaction. InTransaction will rollback if the // all will be executed in the same transaction. InTransaction will rollback if the
// callback returns an error.s // callback returns an error.
InTransaction(ctx context.Context, fn func(ctx context.Context) error) error InTransaction(ctx context.Context, fn func(ctx context.Context) error) error
AddHandler(handler HandlerFunc) AddHandler(handler HandlerFunc)
...@@ -36,19 +35,11 @@ type Bus interface { ...@@ -36,19 +35,11 @@ type Bus interface {
// SetTransactionManager allows the user to replace the internal // SetTransactionManager allows the user to replace the internal
// noop TransactionManager that is responsible for manageing // noop TransactionManager that is responsible for manageing
// transactions in `InTransaction` // transactions in `InTransaction`
SetTransactionManager(tm TransactionManager) SetTransactionManager(tm TransactionWrapper)
} }
func (b *InProcBus) InTransaction(ctx context.Context, fn func(ctx context.Context) error) error { func (b *InProcBus) InTransaction(ctx context.Context, fn func(ctx context.Context) error) error {
ctxWithTran, err := b.transactionManager.Begin(ctx) return b.transactionWrapper.Wrapp(ctx, fn)
if err != nil {
return err
}
err = fn(ctxWithTran)
b.transactionManager.End(ctxWithTran, err)
return err
} }
type InProcBus struct { type InProcBus struct {
...@@ -56,7 +47,7 @@ type InProcBus struct { ...@@ -56,7 +47,7 @@ type InProcBus struct {
listeners map[string][]HandlerFunc listeners map[string][]HandlerFunc
wildcardListeners []HandlerFunc wildcardListeners []HandlerFunc
transactionManager TransactionManager transactionWrapper TransactionWrapper
} }
// temp stuff, not sure how to handle bus instance, and init yet // temp stuff, not sure how to handle bus instance, and init yet
...@@ -68,7 +59,7 @@ func New() Bus { ...@@ -68,7 +59,7 @@ func New() Bus {
bus.listeners = make(map[string][]HandlerFunc) bus.listeners = make(map[string][]HandlerFunc)
bus.wildcardListeners = make([]HandlerFunc, 0) bus.wildcardListeners = make([]HandlerFunc, 0)
bus.transactionManager = &NoopTransactionManager{} bus.transactionWrapper = &noopTransactionManager{}
return bus return bus
} }
...@@ -78,12 +69,12 @@ func GetBus() Bus { ...@@ -78,12 +69,12 @@ func GetBus() Bus {
return globalBus return globalBus
} }
func SetTransactionManager(tm TransactionManager) { func SetTransactionManager(tm TransactionWrapper) {
globalBus.SetTransactionManager(tm) globalBus.SetTransactionManager(tm)
} }
func (b *InProcBus) SetTransactionManager(tm TransactionManager) { func (b *InProcBus) SetTransactionManager(tm TransactionWrapper) {
b.transactionManager = tm b.transactionWrapper = tm
} }
func (b *InProcBus) DispatchCtx(ctx context.Context, msg Msg) error { func (b *InProcBus) DispatchCtx(ctx context.Context, msg Msg) error {
...@@ -208,6 +199,10 @@ func Publish(msg Msg) error { ...@@ -208,6 +199,10 @@ func Publish(msg Msg) error {
return globalBus.Publish(msg) return globalBus.Publish(msg)
} }
// InTransaction starts a transaction and store it in the context.
// The caller can then pass a function with multiple DispatchCtx calls that
// all will be executed in the same transaction. InTransaction will rollback if the
// callback returns an error.
func InTransaction(ctx context.Context, fn func(ctx context.Context) error) error { func InTransaction(ctx context.Context, fn func(ctx context.Context) error) error {
return globalBus.InTransaction(ctx, fn) return globalBus.InTransaction(ctx, fn)
} }
...@@ -216,7 +211,8 @@ func ClearBusHandlers() { ...@@ -216,7 +211,8 @@ func ClearBusHandlers() {
globalBus = New() globalBus = New()
} }
type NoopTransactionManager struct{} type noopTransactionManager struct{}
func (*NoopTransactionManager) Begin(ctx context.Context) (context.Context, error) { return ctx, nil } func (*noopTransactionManager) Wrapp(ctx context.Context, fn func(ctx context.Context) error) error {
func (*NoopTransactionManager) End(ctx context.Context, err error) error { return err } return nil
}
...@@ -32,17 +32,16 @@ func inTransaction(callback dbTransactionFunc) error { ...@@ -32,17 +32,16 @@ func inTransaction(callback dbTransactionFunc) error {
func startSession(ctx context.Context) *DBSession { func startSession(ctx context.Context) *DBSession {
value := ctx.Value(ContextSessionName) value := ctx.Value(ContextSessionName)
var sess *xorm.Session var sess *DBSession
sess, ok := value.(*xorm.Session) sess, ok := value.(*DBSession)
if !ok { if !ok {
return newSession() newSess := newSession()
newSess.Begin()
return newSess
} }
old := newSession() return sess
old.Session = sess
return old
} }
func withDbSession(ctx context.Context, callback dbTransactionFunc) error { func withDbSession(ctx context.Context, callback dbTransactionFunc) error {
......
...@@ -2,7 +2,6 @@ package sqlstore ...@@ -2,7 +2,6 @@ package sqlstore
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"net/url" "net/url"
"os" "os"
...@@ -26,6 +25,7 @@ import ( ...@@ -26,6 +25,7 @@ import (
"github.com/go-xorm/xorm" "github.com/go-xorm/xorm"
_ "github.com/lib/pq" _ "github.com/lib/pq"
_ "github.com/mattn/go-sqlite3" _ "github.com/mattn/go-sqlite3"
sqlite3 "github.com/mattn/go-sqlite3"
_ "github.com/grafana/grafana/pkg/tsdb/mssql" _ "github.com/grafana/grafana/pkg/tsdb/mssql"
) )
...@@ -94,37 +94,49 @@ func (ss *SqlStore) Init() error { ...@@ -94,37 +94,49 @@ func (ss *SqlStore) Init() error {
return ss.ensureAdminUser() return ss.ensureAdminUser()
} }
// SQLTransactionManager begin/end transaction
type SQLTransactionManager struct { type SQLTransactionManager struct {
engine *xorm.Engine engine *xorm.Engine
} }
func (stm *SQLTransactionManager) Begin(ctx context.Context) (context.Context, error) { func (stm *SQLTransactionManager) Wrapp(ctx context.Context, fn func(ctx context.Context) error) error {
sess := stm.engine.NewSession() return stm.wrappInternal(ctx, fn, 0)
err := sess.Begin() }
if err != nil {
return ctx, err func (stm *SQLTransactionManager) wrappInternal(ctx context.Context, fn func(ctx context.Context) error, retry int) error {
} sess := startSession(ctx)
defer sess.Close()
withValue := context.WithValue(ctx, ContextSessionName, sess) withValue := context.WithValue(ctx, ContextSessionName, sess)
return withValue, nil err := fn(withValue)
}
func (stm *SQLTransactionManager) End(ctx context.Context, err error) error { // special handling of database locked errors for sqlite, then we can retry 3 times
value := ctx.Value(ContextSessionName) if sqlError, ok := err.(sqlite3.Error); ok && retry < 5 {
sess, ok := value.(*xorm.Session) if sqlError.Code == sqlite3.ErrLocked {
if !ok { sess.Rollback()
return errors.New("context is missing transaction") time.Sleep(time.Millisecond * time.Duration(10))
sqlog.Info("Database table locked, sleeping then retrying", "retry", retry)
return stm.wrappInternal(ctx, fn, retry+1)
}
} }
if err != nil { if err != nil {
sess.Rollback() sess.Rollback()
return err return err
} else if err = sess.Commit(); err != nil {
return err
} }
defer sess.Close() if len(sess.events) > 0 {
for _, e := range sess.events {
if err = bus.Publish(e); err != nil {
log.Error(3, "Failed to publish event after commit", err)
}
}
}
return sess.Commit() return nil
} }
func (ss *SqlStore) ensureAdminUser() error { func (ss *SqlStore) ensureAdminUser() error {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment