Commit 246f41b8 by Torkel Ödegaard

Bus experiment

parent a799fac9
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"os" "os"
"runtime" "runtime"
"github.com/torkelo/grafana-pro/pkg/bus"
"github.com/torkelo/grafana-pro/pkg/cmd" "github.com/torkelo/grafana-pro/pkg/cmd"
"github.com/codegangsta/cli" "github.com/codegangsta/cli"
...@@ -16,6 +17,8 @@ func init() { ...@@ -16,6 +17,8 @@ func init() {
} }
func main() { func main() {
bus.InitBus()
app := cli.NewApp() app := cli.NewApp()
app.Name = "Grafana Pro" app.Name = "Grafana Pro"
app.Usage = "Grafana Pro Service" app.Usage = "Grafana Pro Service"
......
package bus
import (
"errors"
"fmt"
"reflect"
)
type QueryHandler interface{}
type Query interface{}
var (
handlerIndex map[string]QueryHandler
)
func InitBus() {
handlerIndex = make(map[string]QueryHandler)
}
func SendQuery(query interface{}) error {
var queryName = reflect.TypeOf(query).Elem().Name()
fmt.Printf("sending query for type: %v\n", queryName)
var handler = handlerIndex[queryName]
if handler == nil {
return errors.New("handler not found")
}
var params = make([]reflect.Value, 1)
params[0] = reflect.ValueOf(query)
ret := reflect.ValueOf(handler).Call(params)
err := ret[0].Interface()
if err == nil {
return nil
} else {
return err.(error)
}
}
func AddQueryHandler(handler QueryHandler) {
handlerType := reflect.TypeOf(handler)
queryTypeName := handlerType.In(0).Elem().Name()
fmt.Printf("QueryType %v\n", queryTypeName)
handlerIndex[queryTypeName] = handler
//fmt.Printf("Adding handler for type: %v\n", queryTypeName)
}
package bus
import (
"errors"
"testing"
)
type TestQuery struct {
Id int64
Resp string
}
func TestHandlerReturnsError(t *testing.T) {
InitBus()
AddQueryHandler(func(query *TestQuery) error {
return errors.New("handler error")
})
err := SendQuery(&TestQuery{})
if err == nil {
t.Fatal("Send query failed %v", err)
} else {
t.Log("Handler error received ok")
}
}
func TestHandlerReturn(t *testing.T) {
InitBus()
AddQueryHandler(func(q *TestQuery) error {
q.Resp = "hello from handler"
return nil
})
query := &TestQuery{}
err := SendQuery(query)
if err != nil {
t.Fatal("Send query failed %v", err)
} else if query.Resp != "hello from handler" {
t.Fatal("Failed to get response from handler")
}
}
package rethink
import (
"errors"
"time"
r "github.com/dancannon/gorethink"
"github.com/torkelo/grafana-pro/pkg/log"
"github.com/torkelo/grafana-pro/pkg/models"
)
var (
session *r.Session
dbName string = "grafana"
)
func Init() {
log.Info("Initializing rethink storage")
var err error
session, err = r.Connect(r.ConnectOpts{
Address: "localhost:28015",
Database: dbName,
MaxIdle: 10,
IdleTimeout: time.Second * 10,
})
if err != nil {
log.Error(3, "Failed to connect to rethink database %v", err)
}
createRethinkDBTablesAndIndices()
models.GetAccount = GetAccount
models.GetAccountByLogin = GetAccountByLogin
models.GetDashboard = GetDashboard
models.SearchQuery = SearchQuery
models.DeleteDashboard = DeleteDashboard
models.SaveDashboard = SaveDashboard
}
func createRethinkDBTablesAndIndices() {
r.DbCreate(dbName).Exec(session)
// create tables
r.Db(dbName).TableCreate("dashboards").Exec(session)
r.Db(dbName).TableCreate("accounts").Exec(session)
r.Db(dbName).TableCreate("master").Exec(session)
// create dashboard accountId + slug index
r.Db(dbName).Table("dashboards").IndexCreateFunc("AccountIdSlug", func(row r.Term) interface{} {
return []interface{}{row.Field("AccountId"), row.Field("Slug")}
}).Exec(session)
r.Db(dbName).Table("dashboards").IndexCreate("AccountId").Exec(session)
r.Db(dbName).Table("accounts").IndexCreate("Login").Exec(session)
// create account collaborator index
r.Db(dbName).Table("accounts").
IndexCreateFunc("CollaboratorAccountId", func(row r.Term) interface{} {
return row.Field("Collaborators").Map(func(row r.Term) interface{} {
return row.Field("AccountId")
})
}, r.IndexCreateOpts{Multi: true}).Exec(session)
// make sure master ids row exists
_, err := r.Table("master").Insert(map[string]interface{}{"id": "ids", "NextAccountId": 0}).RunWrite(session)
if err != nil {
log.Error(3, "Failed to insert master ids row", err)
}
}
func getNextAccountId() (int, error) {
resp, err := r.Table("master").Get("ids").Update(map[string]interface{}{
"NextAccountId": r.Row.Field("NextAccountId").Add(1),
}, r.UpdateOpts{ReturnChanges: true}).RunWrite(session)
if err != nil {
return 0, err
}
change := resp.Changes[0]
if change.NewValue == nil {
return 0, errors.New("Failed to get new value after incrementing account id")
}
return int(change.NewValue.(map[string]interface{})["NextAccountId"].(float64)), nil
}
func CreateAccount(account *models.Account) error {
accountId, err := getNextAccountId()
if err != nil {
return err
}
account.Id = accountId
account.UsingAccountId = accountId
resp, err := r.Table("accounts").Insert(account).RunWrite(session)
if err != nil {
return err
}
if resp.Inserted == 0 {
return errors.New("Failed to insert acccount")
}
return nil
}
func GetAccountByLogin(emailOrName string) (*models.Account, error) {
resp, err := r.Table("accounts").GetAllByIndex("Login", emailOrName).Run(session)
if err != nil {
return nil, err
}
var account models.Account
err = resp.One(&account)
if err != nil {
return nil, models.ErrAccountNotFound
}
return &account, nil
}
func GetAccount(id int) (*models.Account, error) {
resp, err := r.Table("accounts").Get(id).Run(session)
if err != nil {
return nil, err
}
var account models.Account
err = resp.One(&account)
if err != nil {
return nil, errors.New("Not found")
}
return &account, nil
}
func UpdateAccount(account *models.Account) error {
resp, err := r.Table("accounts").Update(account).RunWrite(session)
if err != nil {
return err
}
if resp.Replaced == 0 && resp.Unchanged == 0 {
return errors.New("Could not find account to update")
}
return nil
}
func getNextDashboardNumber(accountId int) (int, error) {
resp, err := r.Table("accounts").Get(accountId).Update(map[string]interface{}{
"NextDashboardId": r.Row.Field("NextDashboardId").Add(1),
}, r.UpdateOpts{ReturnChanges: true}).RunWrite(session)
if err != nil {
return 0, err
}
change := resp.Changes[0]
if change.NewValue == nil {
return 0, errors.New("Failed to get next dashboard id, no new value after update")
}
return int(change.NewValue.(map[string]interface{})["NextDashboardId"].(float64)), nil
}
func GetOtherAccountsFor(accountId int) ([]*models.OtherAccount, error) {
resp, err := r.Table("accounts").
GetAllByIndex("CollaboratorAccountId", accountId).
Map(func(row r.Term) interface{} {
return map[string]interface{}{
"id": row.Field("id"),
"Name": row.Field("Email"),
"Role": row.Field("Collaborators").Filter(map[string]interface{}{
"AccountId": accountId,
}).Nth(0).Field("Role"),
}
}).Run(session)
if err != nil {
return nil, err
}
var list []*models.OtherAccount
err = resp.All(&list)
if err != nil {
return nil, errors.New("Failed to read available accounts")
}
return list, nil
}
package rethink
import (
"errors"
r "github.com/dancannon/gorethink"
"github.com/torkelo/grafana-pro/pkg/log"
"github.com/torkelo/grafana-pro/pkg/models"
)
func SaveDashboard(dash *models.Dashboard) error {
resp, err := r.Table("dashboards").Insert(dash, r.InsertOpts{Conflict: "update"}).RunWrite(session)
if err != nil {
return err
}
log.Info("Inserted: %v, Errors: %v, Updated: %v", resp.Inserted, resp.Errors, resp.Updated)
log.Info("First error:", resp.FirstError)
if len(resp.GeneratedKeys) > 0 {
dash.Id = resp.GeneratedKeys[0]
}
return nil
}
func GetDashboard(slug string, accountId int) (*models.Dashboard, error) {
resp, err := r.Table("dashboards").
GetAllByIndex("AccountIdSlug", []interface{}{accountId, slug}).
Run(session)
if err != nil {
return nil, err
}
var dashboard models.Dashboard
err = resp.One(&dashboard)
if err != nil {
return nil, err
}
return &dashboard, nil
}
func DeleteDashboard(slug string, accountId int) error {
resp, err := r.Table("dashboards").
GetAllByIndex("AccountIdSlug", []interface{}{accountId, slug}).
Delete().RunWrite(session)
if err != nil {
return err
}
if resp.Deleted != 1 {
return errors.New("Did not find dashboard to delete")
}
return nil
}
func SearchQuery(query string, accountId int) ([]*models.SearchResult, error) {
docs, err := r.Table("dashboards").
GetAllByIndex("AccountId", []interface{}{accountId}).
Filter(r.Row.Field("Title").Match(".*")).Run(session)
if err != nil {
return nil, err
}
results := make([]*models.SearchResult, 0, 50)
var dashboard models.Dashboard
for docs.Next(&dashboard) {
results = append(results, &models.SearchResult{
Title: dashboard.Title,
Id: dashboard.Slug,
})
}
return results, nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment