Commit e2c794ff by Torkel Ödegaard

feat(instrumentation): lots of refactoring to support tag based backend, #4696

parent 2a9b51d8
...@@ -338,3 +338,24 @@ global_api_key = -1 ...@@ -338,3 +338,24 @@ global_api_key = -1
# global limit on number of logged in users. # global limit on number of logged in users.
global_session = -1 global_session = -1
#################################### Internal Grafana Metrics ##########################
[metrics]
enabled = false
interval_seconds = 10
[metrics.graphite]
address = localhost:2003
prefix = prod.grafana.%(instance_name)s.
[metrics.influxdb]
url = http://localhost:8086
database = site
prefix =
username = grafana
password = grafana
[metrics.influxdb.tags]
hostname = ${HOSTNAME}
service = Grafana
...@@ -77,8 +77,6 @@ func GetDashboard(c *middleware.Context) { ...@@ -77,8 +77,6 @@ func GetDashboard(c *middleware.Context) {
} }
c.JSON(200, dto) c.JSON(200, dto)
metrics.M_Api_Dashboard_Get_Timer.AddTiming(123333)
} }
func getUserLogin(userId int64) string { func getUserLogin(userId int64) string {
......
package metrics
import "github.com/grafana/grafana/pkg/log"
type MetricMeta struct {
tags map[string]string
name string
}
func NewMetricMeta(name string, tagStrings []string) *MetricMeta {
if len(tagStrings)%2 != 0 {
log.Fatal(3, "Metrics: tags array is missing value for key, %v", tagStrings)
}
tags := make(map[string]string)
for i := 0; i < len(tagStrings); i += 2 {
tags[tagStrings[i]] = tagStrings[i+1]
}
return &MetricMeta{
tags: tags,
name: name,
}
}
func (m *MetricMeta) Name() string {
return m.name
}
func (m *MetricMeta) Tags() map[string]string {
return m.tags
}
func (m *MetricMeta) StringifyTags() string {
if len(m.tags) == 0 {
return ""
}
str := ""
for key, value := range m.tags {
str += "." + key + "_" + value
}
return str
}
type Metric interface {
Name() string
Tags() map[string]string
StringifyTags() string
Snapshot() Metric
Clear()
}
...@@ -4,45 +4,26 @@ import "sync/atomic" ...@@ -4,45 +4,26 @@ import "sync/atomic"
// Counters hold an int64 value that can be incremented and decremented. // Counters hold an int64 value that can be incremented and decremented.
type Counter interface { type Counter interface {
Clear() Metric
Count() int64 Count() int64
Dec(int64) Dec(int64)
Inc(int64) Inc(int64)
Snapshot() Counter
} }
// NewCounter constructs a new StandardCounter. // NewCounter constructs a new StandardCounter.
func NewCounter() Counter { func NewCounter(meta *MetricMeta) Counter {
return &StandardCounter{0} return &StandardCounter{
} MetricMeta: meta,
count: 0,
// CounterSnapshot is a read-only copy of another Counter. }
type CounterSnapshot int64
// Clear panics.
func (CounterSnapshot) Clear() {
panic("Clear called on a CounterSnapshot")
} }
// Count returns the count at the time the snapshot was taken.
func (c CounterSnapshot) Count() int64 { return int64(c) }
// Dec panics.
func (CounterSnapshot) Dec(int64) {
panic("Dec called on a CounterSnapshot")
}
// Inc panics.
func (CounterSnapshot) Inc(int64) {
panic("Inc called on a CounterSnapshot")
}
// Snapshot returns the snapshot.
func (c CounterSnapshot) Snapshot() Counter { return c }
// StandardCounter is the standard implementation of a Counter and uses the // StandardCounter is the standard implementation of a Counter and uses the
// sync/atomic package to manage a single int64 value. // sync/atomic package to manage a single int64 value.
type StandardCounter struct { type StandardCounter struct {
*MetricMeta
count int64 count int64
} }
...@@ -66,7 +47,9 @@ func (c *StandardCounter) Inc(i int64) { ...@@ -66,7 +47,9 @@ func (c *StandardCounter) Inc(i int64) {
atomic.AddInt64(&c.count, i) atomic.AddInt64(&c.count, i)
} }
// Snapshot returns a read-only copy of the counter. func (c *StandardCounter) Snapshot() Metric {
func (c *StandardCounter) Snapshot() Counter { return &StandardCounter{
return CounterSnapshot(c.Count()) MetricMeta: c.MetricMeta,
count: c.count,
}
} }
package publishers package metrics
import ( import (
"bytes" "bytes"
...@@ -30,7 +30,7 @@ func CreateGraphitePublisher() (*GraphitePublisher, error) { ...@@ -30,7 +30,7 @@ func CreateGraphitePublisher() (*GraphitePublisher, error) {
return publisher, nil return publisher, nil
} }
func (this *GraphitePublisher) Publish(metrics map[string]interface{}) { func (this *GraphitePublisher) Publish(metrics []Metric) {
conn, err := net.DialTimeout(this.Protocol, this.Address, time.Second*5) conn, err := net.DialTimeout(this.Protocol, this.Address, time.Second*5)
if err != nil { if err != nil {
...@@ -40,11 +40,18 @@ func (this *GraphitePublisher) Publish(metrics map[string]interface{}) { ...@@ -40,11 +40,18 @@ func (this *GraphitePublisher) Publish(metrics map[string]interface{}) {
buf := bytes.NewBufferString("") buf := bytes.NewBufferString("")
now := time.Now().Unix() now := time.Now().Unix()
for key, value := range metrics { for _, m := range metrics {
metricName := this.Prefix + key metricName := this.Prefix + m.Name() + m.StringifyTags()
line := fmt.Sprintf("%s %d %d\n", metricName, value, now)
switch metric := m.(type) {
case Counter:
if metric.Count() > 0 {
line := fmt.Sprintf("%s %d %d\n", metricName, metric.Count(), now)
buf.WriteString(line) buf.WriteString(line)
} }
}
}
log.Trace("Metrics: GraphitePublisher.Publish() \n%s", buf) log.Trace("Metrics: GraphitePublisher.Publish() \n%s", buf)
_, err = conn.Write(buf.Bytes()) _, err = conn.Write(buf.Bytes())
......
package publishers package metrics
import ( import (
"net/url" "net/url"
...@@ -12,6 +12,7 @@ import ( ...@@ -12,6 +12,7 @@ import (
type InfluxPublisher struct { type InfluxPublisher struct {
database string database string
tags map[string]string tags map[string]string
prefix string
client *client.Client client *client.Client
} }
...@@ -34,6 +35,8 @@ func CreateInfluxPublisher() (*InfluxPublisher, error) { ...@@ -34,6 +35,8 @@ func CreateInfluxPublisher() (*InfluxPublisher, error) {
} }
publisher.database = influxSection.Key("database").MustString("grafana_metrics") publisher.database = influxSection.Key("database").MustString("grafana_metrics")
publisher.prefix = influxSection.Key("prefix").MustString("prefix")
username := influxSection.Key("User").MustString("grafana") username := influxSection.Key("User").MustString("grafana")
password := influxSection.Key("Password").MustString("grafana") password := influxSection.Key("Password").MustString("grafana")
...@@ -60,7 +63,7 @@ func CreateInfluxPublisher() (*InfluxPublisher, error) { ...@@ -60,7 +63,7 @@ func CreateInfluxPublisher() (*InfluxPublisher, error) {
return publisher, nil return publisher, nil
} }
func (this *InfluxPublisher) Publish(metrics map[string]interface{}) { func (this *InfluxPublisher) Publish(metrics []Metric) {
bp := client.BatchPoints{ bp := client.BatchPoints{
Time: time.Now(), Time: time.Now(),
Database: this.database, Database: this.database,
...@@ -71,13 +74,19 @@ func (this *InfluxPublisher) Publish(metrics map[string]interface{}) { ...@@ -71,13 +74,19 @@ func (this *InfluxPublisher) Publish(metrics map[string]interface{}) {
bp.Tags[key] = value bp.Tags[key] = value
} }
for key, value := range metrics { for _, m := range metrics {
bp.Points = append(bp.Points, client.Point{ point := client.Point{
Measurement: key, Measurement: this.prefix + m.Name(),
Fields: map[string]interface{}{ Tags: m.Tags(),
"value": value, }
},
}) switch metric := m.(type) {
case Counter:
if metric.Count() > 0 {
point.Fields = map[string]interface{}{"value": metric.Count()}
bp.Points = append(bp.Points, point)
}
}
} }
_, err := this.client.Write(bp) _, err := this.client.Write(bp)
......
package metrics package metrics
type comboCounterRef struct { type comboCounterRef struct {
*MetricMeta
usageCounter Counter usageCounter Counter
metricCounter Counter metricCounter Counter
} }
type comboTimerRef struct { type comboTimerRef struct {
*MetricMeta
usageTimer Timer usageTimer Timer
metricTimer Timer metricTimer Timer
} }
func NewComboCounterRef(name string) Counter { func RegComboCounter(name string, tagStrings ...string) Counter {
cr := &comboCounterRef{} meta := NewMetricMeta(name, tagStrings)
cr.usageCounter = UsageStats.GetOrRegister(name, NewCounter).(Counter) cr := &comboCounterRef{
cr.metricCounter = MetricStats.GetOrRegister(name, NewCounter).(Counter) MetricMeta: meta,
usageCounter: NewCounter(meta),
metricCounter: NewCounter(meta),
}
UsageStats.Register(cr.usageCounter)
MetricStats.Register(cr.metricCounter)
return cr return cr
} }
func NewComboTimerRef(name string) Timer { // func NewComboTimerRef(name string, tagStrings ...string) Timer {
tr := &comboTimerRef{} // meta := NewMetricMeta(name, tagStrings)
tr.usageTimer = UsageStats.GetOrRegister(name, NewTimer).(Timer) // tr := &comboTimerRef{}
tr.metricTimer = MetricStats.GetOrRegister(name, NewTimer).(Timer) // tr.usageTimer = UsageStats.GetOrRegister(NewTimer).(Timer)
return tr // tr.metricTimer = MetricStats.GetOrRegister(NewTimer).(Timer)
} // return tr
// }
func (t comboTimerRef) Clear() { func (t comboTimerRef) Clear() {
t.metricTimer.Clear() t.metricTimer.Clear()
...@@ -71,7 +81,6 @@ func (c comboCounterRef) Inc(i int64) { ...@@ -71,7 +81,6 @@ func (c comboCounterRef) Inc(i int64) {
c.metricCounter.Inc(i) c.metricCounter.Inc(i)
} }
// Snapshot returns the snapshot. func (c comboCounterRef) Snapshot() Metric {
func (c comboCounterRef) Snapshot() Counter { return c.metricCounter.Snapshot()
panic("snapshot called on a combocounter ref")
} }
...@@ -4,31 +4,31 @@ var UsageStats = NewRegistry() ...@@ -4,31 +4,31 @@ var UsageStats = NewRegistry()
var MetricStats = NewRegistry() var MetricStats = NewRegistry()
var ( var (
M_Instance_Start = NewComboCounterRef("instance.start") M_Instance_Start = RegComboCounter("instance_start")
M_Page_Status_200 = NewComboCounterRef("page.status.200") M_Page_Status_200 = RegComboCounter("page_resp_status", "code", "200")
M_Page_Status_500 = NewComboCounterRef("page.status.500") M_Page_Status_500 = RegComboCounter("page_resp_status", "code", "500")
M_Page_Status_404 = NewComboCounterRef("page.status.404") M_Page_Status_404 = RegComboCounter("page_resp_status", "code", "404")
M_Api_Status_500 = NewComboCounterRef("api.status.500") M_Api_Status_500 = RegComboCounter("api_resp_status", "code", "500")
M_Api_Status_404 = NewComboCounterRef("api.status.404") M_Api_Status_404 = RegComboCounter("api_resp_status", "code", "404")
M_Api_User_SignUpStarted = NewComboCounterRef("api.user.signup_started") M_Api_User_SignUpStarted = RegComboCounter("api.user.signup_started")
M_Api_User_SignUpCompleted = NewComboCounterRef("api.user.signup_completed") M_Api_User_SignUpCompleted = RegComboCounter("api.user.signup_completed")
M_Api_User_SignUpInvite = NewComboCounterRef("api.user.signup_invite") M_Api_User_SignUpInvite = RegComboCounter("api.user.signup_invite")
M_Api_Dashboard_Get = NewComboCounterRef("api.dashboard.get") M_Api_Dashboard_Get = RegComboCounter("api.dashboard.get")
M_Api_Dashboard_Get_Timer = NewComboTimerRef("api.dashboard_load") // M_Api_Dashboard_Get_Timer = NewComboTimerRef("api.dashboard_load")
M_Api_Dashboard_Post = NewComboCounterRef("api.dashboard.post") M_Api_Dashboard_Post = RegComboCounter("api.dashboard.post")
M_Api_Admin_User_Create = NewComboCounterRef("api.admin.user_create") M_Api_Admin_User_Create = RegComboCounter("api.admin.user_create")
M_Api_Login_Post = NewComboCounterRef("api.login.post") M_Api_Login_Post = RegComboCounter("api.login.post")
M_Api_Login_OAuth = NewComboCounterRef("api.login.oauth") M_Api_Login_OAuth = RegComboCounter("api.login.oauth")
M_Api_Org_Create = NewComboCounterRef("api.org.create") M_Api_Org_Create = RegComboCounter("api.org.create")
M_Api_Dashboard_Snapshot_Create = NewComboCounterRef("api.dashboard_snapshot.create") M_Api_Dashboard_Snapshot_Create = RegComboCounter("api.dashboard_snapshot.create")
M_Api_Dashboard_Snapshot_External = NewComboCounterRef("api.dashboard_snapshot.external") M_Api_Dashboard_Snapshot_External = RegComboCounter("api.dashboard_snapshot.external")
M_Api_Dashboard_Snapshot_Get = NewComboCounterRef("api.dashboard_snapshot.get") M_Api_Dashboard_Snapshot_Get = RegComboCounter("api.dashboard_snapshot.get")
M_Models_Dashboard_Insert = NewComboCounterRef("models.dashboard.insert") M_Models_Dashboard_Insert = RegComboCounter("models.dashboard.insert")
) )
...@@ -39,25 +39,7 @@ func instrumentationLoop() chan struct{} { ...@@ -39,25 +39,7 @@ func instrumentationLoop() chan struct{} {
} }
func sendMetrics(settings *MetricSettings) { func sendMetrics(settings *MetricSettings) {
metrics := map[string]interface{}{} metrics := MetricStats.GetSnapshots()
MetricStats.Each(func(name string, i interface{}) {
switch metric := i.(type) {
case Counter:
if metric.Count() > 0 {
metrics[name+".count"] = metric.Count()
metric.Clear()
}
case Timer:
if metric.Total() > 0 {
metrics[name+".avg"] = metric.Avg()
metrics[name+".min"] = metric.Min()
metrics[name+".max"] = metric.Max()
metrics[name+".total"] = metric.Total()
metric.Clear()
}
}
})
for _, publisher := range settings.Publishers { for _, publisher := range settings.Publishers {
publisher.Publish(metrics) publisher.Publish(metrics)
...@@ -79,15 +61,15 @@ func sendUsageStats() { ...@@ -79,15 +61,15 @@ func sendUsageStats() {
"metrics": metrics, "metrics": metrics,
} }
UsageStats.Each(func(name string, i interface{}) { snapshots := UsageStats.GetSnapshots()
switch metric := i.(type) { for _, m := range snapshots {
switch metric := m.(type) {
case Counter: case Counter:
if metric.Count() > 0 { if metric.Count() > 0 {
metrics[name+".count"] = metric.Count() metrics[metric.Name()+".count"] = metric.Count()
metric.Clear() }
} }
} }
})
statsQuery := m.GetSystemStatsQuery{} statsQuery := m.GetSystemStatsQuery{}
if err := bus.Dispatch(&statsQuery); err != nil { if err := bus.Dispatch(&statsQuery); err != nil {
......
package metrics package metrics
import ( import "sync"
"fmt"
"reflect"
"sync"
)
// DuplicateMetric is the error returned by Registry.Register when a metric
// already exists. If you mean to Register that metric you must first
// Unregister the existing metric.
type DuplicateMetric string
func (err DuplicateMetric) Error() string {
return fmt.Sprintf("duplicate metric: %s", string(err))
}
type Registry interface { type Registry interface {
// Call the given function for each registered metric. GetSnapshots() []Metric
Each(func(string, interface{})) Register(metric Metric)
// Get the metric by the given name or nil if none is registered.
Get(string) interface{}
// Gets an existing metric or registers the given one.
// The interface can be the metric to register if not found in registry,
// or a function returning the metric for lazy instantiation.
GetOrRegister(string, interface{}) interface{}
// Register the given metric under the given name.
Register(string, interface{}) error
} }
// The standard implementation of a Registry is a mutex-protected map // The standard implementation of a Registry is a mutex-protected map
// of names to metrics. // of names to metrics.
type StandardRegistry struct { type StandardRegistry struct {
metrics map[string]interface{} metrics []Metric
mutex sync.Mutex mutex sync.Mutex
} }
// Create a new registry. // Create a new registry.
func NewRegistry() Registry { func NewRegistry() Registry {
return &StandardRegistry{metrics: make(map[string]interface{})} return &StandardRegistry{
} metrics: make([]Metric, 0),
// Call the given function for each registered metric.
func (r *StandardRegistry) Each(f func(string, interface{})) {
for name, i := range r.registered() {
f(name, i)
}
}
// Get the metric by the given name or nil if none is registered.
func (r *StandardRegistry) Get(name string) interface{} {
r.mutex.Lock()
defer r.mutex.Unlock()
return r.metrics[name]
}
// Gets an existing metric or creates and registers a new one. Threadsafe
// alternative to calling Get and Register on failure.
// The interface can be the metric to register if not found in registry,
// or a function returning the metric for lazy instantiation.
func (r *StandardRegistry) GetOrRegister(name string, i interface{}) interface{} {
r.mutex.Lock()
defer r.mutex.Unlock()
if metric, ok := r.metrics[name]; ok {
return metric
}
if v := reflect.ValueOf(i); v.Kind() == reflect.Func {
i = v.Call(nil)[0].Interface()
} }
r.register(name, i)
return i
} }
// Register the given metric under the given name. Returns a DuplicateMetric func (r *StandardRegistry) Register(metric Metric) {
// if a metric by the given name is already registered.
func (r *StandardRegistry) Register(name string, i interface{}) error {
r.mutex.Lock() r.mutex.Lock()
defer r.mutex.Unlock() defer r.mutex.Unlock()
return r.register(name, i) r.metrics = append(r.metrics, metric)
} }
func (r *StandardRegistry) register(name string, i interface{}) error { // Call the given function for each registered metric.
if _, ok := r.metrics[name]; ok { func (r *StandardRegistry) GetSnapshots() []Metric {
return DuplicateMetric(name) metrics := make([]Metric, len(r.metrics))
} for i, metric := range r.metrics {
metrics[i] = metric.Snapshot()
r.metrics[name] = i metric.Clear()
return nil
}
func (r *StandardRegistry) registered() map[string]interface{} {
metrics := make(map[string]interface{}, len(r.metrics))
r.mutex.Lock()
defer r.mutex.Unlock()
for name, i := range r.metrics {
metrics[name] = i
} }
return metrics return metrics
} }
...@@ -2,12 +2,11 @@ package metrics ...@@ -2,12 +2,11 @@ package metrics
import ( import (
"github.com/grafana/grafana/pkg/log" "github.com/grafana/grafana/pkg/log"
"github.com/grafana/grafana/pkg/metrics/publishers"
"github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/setting"
) )
type MetricPublisher interface { type MetricPublisher interface {
Publish(metrics map[string]interface{}) Publish(metrics []Metric)
} }
type MetricSettings struct { type MetricSettings struct {
...@@ -36,14 +35,14 @@ func readSettings() *MetricSettings { ...@@ -36,14 +35,14 @@ func readSettings() *MetricSettings {
return settings return settings
} }
if graphitePublisher, err := publishers.CreateGraphitePublisher(); err != nil { if graphitePublisher, err := CreateGraphitePublisher(); err != nil {
log.Error(3, "Metrics: Failed to init Graphite metric publisher", err) log.Error(3, "Metrics: Failed to init Graphite metric publisher", err)
} else if graphitePublisher != nil { } else if graphitePublisher != nil {
log.Info("Metrics: Internal metrics publisher Graphite initialized") log.Info("Metrics: Internal metrics publisher Graphite initialized")
settings.Publishers = append(settings.Publishers, graphitePublisher) settings.Publishers = append(settings.Publishers, graphitePublisher)
} }
if influxPublisher, err := publishers.CreateInfluxPublisher(); err != nil { if influxPublisher, err := CreateInfluxPublisher(); err != nil {
log.Error(3, "Metrics: Failed to init InfluxDB metric publisher", err) log.Error(3, "Metrics: Failed to init InfluxDB metric publisher", err)
} else if influxPublisher != nil { } else if influxPublisher != nil {
log.Info("Metrics: Internal metrics publisher InfluxDB initialized") log.Info("Metrics: Internal metrics publisher InfluxDB initialized")
......
...@@ -3,16 +3,28 @@ package metrics ...@@ -3,16 +3,28 @@ package metrics
//import "sync/atomic" //import "sync/atomic"
type Timer interface { type Timer interface {
Metric
AddTiming(int64) AddTiming(int64)
Clear()
Avg() int64 Avg() int64
Min() int64 Min() int64
Max() int64 Max() int64
Total() int64 Count() int64
}
type StandardTimer struct {
*MetricMeta
total int64
count int64
avg int64
min int64
max int64
} }
func NewTimer() Timer { func NewTimer(meta *MetricMeta) Timer {
return &StandardTimer{ return &StandardTimer{
MetricMeta: meta,
avg: 0, avg: 0,
min: 0, min: 0,
max: 0, max: 0,
...@@ -56,14 +68,17 @@ func (this *StandardTimer) Max() int64 { ...@@ -56,14 +68,17 @@ func (this *StandardTimer) Max() int64 {
return this.max return this.max
} }
func (this *StandardTimer) Total() int64 { func (this *StandardTimer) Count() int64 {
return this.total return this.count
} }
type StandardTimer struct { func (this *StandardTimer) Snapshot() Metric {
total int64 return &StandardTimer{
count int64 MetricMeta: this.MetricMeta,
avg int64 avg: this.avg,
min int64 min: this.min,
max int64 max: this.max,
total: this.total,
count: this.count,
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment