Commit 428ca600 by Marcus Olsson Committed by GitHub

Chore: Refactor grafana-server (#19796)

* Rename GrafanaServerImpl to Server

* Make flag dependencies explicit

* Extract method for building service graph

* Document and clarify methods

* Rename HttpServer to HTTPServer
parent 6f7f9aa7
...@@ -37,16 +37,20 @@ var commit = "NA" ...@@ -37,16 +37,20 @@ var commit = "NA"
var buildBranch = "master" var buildBranch = "master"
var buildstamp string var buildstamp string
var configFile = flag.String("config", "", "path to config file")
var homePath = flag.String("homepath", "", "path to grafana install/home path, defaults to working directory")
var pidFile = flag.String("pidfile", "", "path to pid file")
var packaging = flag.String("packaging", "unknown", "describes the way Grafana was installed")
func main() { func main() {
v := flag.Bool("v", false, "prints current version and exits") var (
profile := flag.Bool("profile", false, "Turn on pprof profiling") configFile = flag.String("config", "", "path to config file")
profilePort := flag.Int("profile-port", 6060, "Define custom port for profiling") homePath = flag.String("homepath", "", "path to grafana install/home path, defaults to working directory")
pidFile = flag.String("pidfile", "", "path to pid file")
packaging = flag.String("packaging", "unknown", "describes the way Grafana was installed")
v = flag.Bool("v", false, "prints current version and exits")
profile = flag.Bool("profile", false, "Turn on pprof profiling")
profilePort = flag.Int("profile-port", 6060, "Define custom port for profiling")
)
flag.Parse() flag.Parse()
if *v { if *v {
fmt.Printf("Version %s (commit: %s, branch: %s)\n", version, commit, buildBranch) fmt.Printf("Version %s (commit: %s, branch: %s)\n", version, commit, buildBranch)
os.Exit(0) os.Exit(0)
...@@ -88,13 +92,13 @@ func main() { ...@@ -88,13 +92,13 @@ func main() {
metrics.SetBuildInformation(version, commit, buildBranch) metrics.SetBuildInformation(version, commit, buildBranch)
server := NewGrafanaServer() server := NewServer(*configFile, *homePath, *pidFile)
go listenToSystemSignals(server) go listenToSystemSignals(server)
err := server.Run() err := server.Run()
code := server.Exit(err) code := server.ExitCode(err)
trace.Stop() trace.Stop()
log.Close() log.Close()
...@@ -111,7 +115,7 @@ func validPackaging(packaging string) string { ...@@ -111,7 +115,7 @@ func validPackaging(packaging string) string {
return "unknown" return "unknown"
} }
func listenToSystemSignals(server *GrafanaServerImpl) { func listenToSystemSignals(server *Server) {
signalChan := make(chan os.Signal, 1) signalChan := make(chan os.Signal, 1)
sighupChan := make(chan os.Signal, 1) sighupChan := make(chan os.Signal, 1)
......
...@@ -41,20 +41,26 @@ import ( ...@@ -41,20 +41,26 @@ import (
"github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/setting"
) )
func NewGrafanaServer() *GrafanaServerImpl { // NewServer returns a new instance of Server.
func NewServer(configFile, homePath, pidFile string) *Server {
rootCtx, shutdownFn := context.WithCancel(context.Background()) rootCtx, shutdownFn := context.WithCancel(context.Background())
childRoutines, childCtx := errgroup.WithContext(rootCtx) childRoutines, childCtx := errgroup.WithContext(rootCtx)
return &GrafanaServerImpl{ return &Server{
context: childCtx, context: childCtx,
shutdownFn: shutdownFn, shutdownFn: shutdownFn,
childRoutines: childRoutines, childRoutines: childRoutines,
log: log.New("server"), log: log.New("server"),
cfg: setting.NewCfg(), cfg: setting.NewCfg(),
configFile: configFile,
homePath: homePath,
pidFile: pidFile,
} }
} }
type GrafanaServerImpl struct { // Server is responsible for managing the lifecycle of services.
type Server struct {
context context.Context context context.Context
shutdownFn context.CancelFunc shutdownFn context.CancelFunc
childRoutines *errgroup.Group childRoutines *errgroup.Group
...@@ -63,75 +69,48 @@ type GrafanaServerImpl struct { ...@@ -63,75 +69,48 @@ type GrafanaServerImpl struct {
shutdownReason string shutdownReason string
shutdownInProgress bool shutdownInProgress bool
configFile string
homePath string
pidFile string
RouteRegister routing.RouteRegister `inject:""` RouteRegister routing.RouteRegister `inject:""`
HttpServer *api.HTTPServer `inject:""` HTTPServer *api.HTTPServer `inject:""`
} }
func (g *GrafanaServerImpl) Run() error { // Run initializes and starts services. This will block until all services have
var err error // exited. To initiate shutdown, call the Shutdown method in another goroutine.
g.loadConfiguration() func (s *Server) Run() error {
g.writePIDFile() s.loadConfiguration()
s.writePIDFile()
login.Init() login.Init()
social.NewOAuthService() social.NewOAuthService()
serviceGraph := inject.Graph{}
err = serviceGraph.Provide(&inject.Object{Value: bus.GetBus()})
if err != nil {
return fmt.Errorf("Failed to provide object to the graph: %v", err)
}
err = serviceGraph.Provide(&inject.Object{Value: g.cfg})
if err != nil {
return fmt.Errorf("Failed to provide object to the graph: %v", err)
}
err = serviceGraph.Provide(&inject.Object{Value: routing.NewRouteRegister(middleware.RequestMetrics, middleware.RequestTracing)})
if err != nil {
return fmt.Errorf("Failed to provide object to the graph: %v", err)
}
err = serviceGraph.Provide(&inject.Object{Value: localcache.New(5*time.Minute, 10*time.Minute)})
if err != nil {
return fmt.Errorf("Failed to provide object to the graph: %v", err)
}
// self registered services
services := registry.GetServices() services := registry.GetServices()
// Add all services to dependency graph if err := s.buildServiceGraph(services); err != nil {
for _, service := range services { return err
err = serviceGraph.Provide(&inject.Object{Value: service.Instance})
if err != nil {
return fmt.Errorf("Failed to provide object to the graph: %v", err)
}
}
err = serviceGraph.Provide(&inject.Object{Value: g})
if err != nil {
return fmt.Errorf("Failed to provide object to the graph: %v", err)
}
// Inject dependencies to services
if err := serviceGraph.Populate(); err != nil {
return fmt.Errorf("Failed to populate service dependency: %v", err)
} }
// Init & start services // Initialize services.
for _, service := range services { for _, service := range services {
if registry.IsDisabled(service.Instance) { if registry.IsDisabled(service.Instance) {
continue continue
} }
g.log.Info("Initializing " + service.Name) s.log.Info("Initializing " + service.Name)
if err := service.Instance.Init(); err != nil { if err := service.Instance.Init(); err != nil {
return fmt.Errorf("Service init failed: %v", err) return fmt.Errorf("Service init failed: %v", err)
} }
} }
// Start background services // Start background services.
for _, srv := range services { for _, svc := range services {
// variable needed for accessing loop variable in function callback // Variable is needed for accessing loop variable in function callback
descriptor := srv descriptor := svc
service, ok := srv.Instance.(registry.BackgroundService)
service, ok := svc.Instance.(registry.BackgroundService)
if !ok { if !ok {
continue continue
} }
...@@ -140,98 +119,140 @@ func (g *GrafanaServerImpl) Run() error { ...@@ -140,98 +119,140 @@ func (g *GrafanaServerImpl) Run() error {
continue continue
} }
g.childRoutines.Go(func() error { s.childRoutines.Go(func() error {
// Skip starting new service when shutting down // Don't start new services when server is shutting down.
// Can happen when service stop/return during startup if s.shutdownInProgress {
if g.shutdownInProgress {
return nil return nil
} }
err := service.Run(g.context) if err := service.Run(s.context); err != nil {
if err != context.Canceled {
// If error is not canceled then the service crashed // Server has crashed.
if err != context.Canceled && err != nil { s.log.Error("Stopped "+descriptor.Name, "reason", err)
g.log.Error("Stopped "+descriptor.Name, "reason", err) } else {
} else { s.log.Info("Stopped "+descriptor.Name, "reason", err)
g.log.Info("Stopped "+descriptor.Name, "reason", err) }
} }
// Mark that we are in shutdown mode s.shutdownInProgress = true
// So more services are not started
g.shutdownInProgress = true return nil
return err
}) })
} }
sendSystemdNotification("READY=1") notifySystemd("READY=1")
return g.childRoutines.Wait() return s.childRoutines.Wait()
} }
func (g *GrafanaServerImpl) loadConfiguration() { // Shutdown initiates a shutdown of the services, and waits for all services to
err := g.cfg.Load(&setting.CommandLineArgs{ // exit.
Config: *configFile, func (s *Server) Shutdown(reason string) {
HomePath: *homePath, s.log.Info("Shutdown started", "reason", reason)
Args: flag.Args(), s.shutdownReason = reason
}) s.shutdownInProgress = true
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to start grafana. error: %s\n", err.Error())
os.Exit(1)
}
g.log.Info("Starting "+setting.ApplicationName, "version", version, "commit", commit, "branch", buildBranch, "compiled", time.Unix(setting.BuildStamp, 0))
g.cfg.LogConfigSources()
}
func (g *GrafanaServerImpl) Shutdown(reason string) {
g.log.Info("Shutdown started", "reason", reason)
g.shutdownReason = reason
g.shutdownInProgress = true
// call cancel func on root context // call cancel func on root context
g.shutdownFn() s.shutdownFn()
// wait for child routines // wait for child routines
g.childRoutines.Wait() s.childRoutines.Wait()
} }
func (g *GrafanaServerImpl) Exit(reason error) int { // ExitCode returns an exit code for a given error.
// default exit code is 1 func (s *Server) ExitCode(reason error) int {
code := 1 code := 1
if reason == context.Canceled && g.shutdownReason != "" { if reason == context.Canceled && s.shutdownReason != "" {
reason = fmt.Errorf(g.shutdownReason) reason = fmt.Errorf(s.shutdownReason)
code = 0 code = 0
} }
g.log.Error("Server shutdown", "reason", reason) s.log.Error("Server shutdown", "reason", reason)
return code return code
} }
func (g *GrafanaServerImpl) writePIDFile() { // writePIDFile retrieves the current process ID and writes it to file.
if *pidFile == "" { func (s *Server) writePIDFile() {
if s.pidFile == "" {
return return
} }
// Ensure the required directory structure exists. // Ensure the required directory structure exists.
err := os.MkdirAll(filepath.Dir(*pidFile), 0700) err := os.MkdirAll(filepath.Dir(s.pidFile), 0700)
if err != nil { if err != nil {
g.log.Error("Failed to verify pid directory", "error", err) s.log.Error("Failed to verify pid directory", "error", err)
os.Exit(1) os.Exit(1)
} }
// Retrieve the PID and write it. // Retrieve the PID and write it to file.
pid := strconv.Itoa(os.Getpid()) pid := strconv.Itoa(os.Getpid())
if err := ioutil.WriteFile(*pidFile, []byte(pid), 0644); err != nil { if err := ioutil.WriteFile(s.pidFile, []byte(pid), 0644); err != nil {
g.log.Error("Failed to write pidfile", "error", err) s.log.Error("Failed to write pidfile", "error", err)
os.Exit(1) os.Exit(1)
} }
g.log.Info("Writing PID file", "path", *pidFile, "pid", pid) s.log.Info("Writing PID file", "path", s.pidFile, "pid", pid)
} }
func sendSystemdNotification(state string) error { // buildServiceGraph builds a graph of services and their dependencies.
func (s *Server) buildServiceGraph(services []*registry.Descriptor) error {
// Specify service dependencies.
objs := []interface{}{
bus.GetBus(),
s.cfg,
routing.NewRouteRegister(middleware.RequestMetrics, middleware.RequestTracing),
localcache.New(5*time.Minute, 10*time.Minute),
s,
}
for _, service := range services {
objs = append(objs, service.Instance)
}
var serviceGraph inject.Graph
// Provide services and their dependencies to the graph.
for _, obj := range objs {
if err := serviceGraph.Provide(&inject.Object{Value: obj}); err != nil {
return fmt.Errorf("Failed to provide object to the graph: %v", err)
}
}
// Resolve services and their dependencies.
if err := serviceGraph.Populate(); err != nil {
return fmt.Errorf("Failed to populate service dependency: %v", err)
}
return nil
}
// loadConfiguration loads settings and configuration from config files.
func (s *Server) loadConfiguration() {
args := &setting.CommandLineArgs{
Config: s.configFile,
HomePath: s.homePath,
Args: flag.Args(),
}
if err := s.cfg.Load(args); err != nil {
fmt.Fprintf(os.Stderr, "Failed to start grafana. error: %s\n", err.Error())
os.Exit(1)
}
s.log.Info("Starting "+setting.ApplicationName,
"version", version,
"commit", commit,
"branch", buildBranch,
"compiled", time.Unix(setting.BuildStamp, 0),
)
s.cfg.LogConfigSources()
}
// notifySystemd sends state notifications to systemd.
func notifySystemd(state string) error {
notifySocket := os.Getenv("NOTIFY_SOCKET") notifySocket := os.Getenv("NOTIFY_SOCKET")
if notifySocket == "" { if notifySocket == "" {
...@@ -244,14 +265,12 @@ func sendSystemdNotification(state string) error { ...@@ -244,14 +265,12 @@ func sendSystemdNotification(state string) error {
} }
conn, err := net.DialUnix(socketAddr.Net, nil, socketAddr) conn, err := net.DialUnix(socketAddr.Net, nil, socketAddr)
if err != nil { if err != nil {
return err return err
} }
defer conn.Close()
_, err = conn.Write([]byte(state)) _, err = conn.Write([]byte(state))
conn.Close()
return err return err
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment