Commit 71ffd1d1 by Marcus Efraimsson Committed by GitHub

Alerting: Fix image rendering and uploading timeout preventing to send alert notifications (#21536)

* svc alerting - use a shorter ctx to upload the img
This will prevent timeout on img upload to cancel the notifications from being sent

* components img uploader - pass the ctx to aws lib

* make webdavuploader use the ctx

* make azureblobuploader use the ctx

* rename uploadImage() to renderAndUploadImage()
for better clarity about what this method work

* Use timeout + 2s for plugin renderer (same as service and phantomjs)

* Make sure that original EvalContext is updated after render and upload

* Verify notification sent even if render or image upload times out

* fix lint

* fixes after review

Co-authored-by: Edouard Hur <3418467+hekmon@users.noreply.github.com>

Fixes #21018
parent 6e412d88
...@@ -59,7 +59,7 @@ func (az *AzureBlobUploader) Upload(ctx context.Context, imageDiskPath string) ( ...@@ -59,7 +59,7 @@ func (az *AzureBlobUploader) Upload(ctx context.Context, imageDiskPath string) (
randomFileName += pngExt randomFileName += pngExt
// upload image // upload image
az.log.Debug("Uploading image to azure_blob", "container_name", az.container_name, "blob_name", randomFileName) az.log.Debug("Uploading image to azure_blob", "container_name", az.container_name, "blob_name", randomFileName)
resp, err := blob.FileUpload(az.container_name, randomFileName, file) resp, err := blob.FileUpload(ctx, az.container_name, randomFileName, file)
if err != nil { if err != nil {
return "", err return "", err
} }
...@@ -162,7 +162,7 @@ func copyHeadersToRequest(req *http.Request, headers map[string]string) { ...@@ -162,7 +162,7 @@ func copyHeadersToRequest(req *http.Request, headers map[string]string) {
} }
} }
func (c *StorageClient) FileUpload(container, blobName string, body io.Reader) (*http.Response, error) { func (c *StorageClient) FileUpload(ctx context.Context, container, blobName string, body io.Reader) (*http.Response, error) {
blobName = escape(blobName) blobName = escape(blobName)
extension := strings.ToLower(path.Ext(blobName)) extension := strings.ToLower(path.Ext(blobName))
contentType := mime.TypeByExtension(extension) contentType := mime.TypeByExtension(extension)
...@@ -178,6 +178,9 @@ func (c *StorageClient) FileUpload(container, blobName string, body io.Reader) ( ...@@ -178,6 +178,9 @@ func (c *StorageClient) FileUpload(container, blobName string, body io.Reader) (
if err != nil { if err != nil {
return nil, err return nil, err
} }
if ctx != nil {
req = req.WithContext(ctx)
}
copyHeadersToRequest(req, map[string]string{ copyHeadersToRequest(req, map[string]string{
"x-ms-blob-type": "BlockBlob", "x-ms-blob-type": "BlockBlob",
......
...@@ -83,7 +83,7 @@ func (u *S3Uploader) Upload(ctx context.Context, imageDiskPath string) (string, ...@@ -83,7 +83,7 @@ func (u *S3Uploader) Upload(ctx context.Context, imageDiskPath string) (string,
return "", err return "", err
} }
uploader := s3manager.NewUploader(sess) uploader := s3manager.NewUploader(sess)
result, err := uploader.Upload(&s3manager.UploadInput{ result, err := uploader.UploadWithContext(ctx, &s3manager.UploadInput{
Bucket: aws.String(u.bucket), Bucket: aws.String(u.bucket),
Key: aws.String(key), Key: aws.String(key),
ACL: aws.String(u.acl), ACL: aws.String(u.acl),
......
...@@ -64,7 +64,9 @@ func (u *WebdavUploader) Upload(ctx context.Context, pa string) (string, error) ...@@ -64,7 +64,9 @@ func (u *WebdavUploader) Upload(ctx context.Context, pa string) (string, error)
if err != nil { if err != nil {
return "", err return "", err
} }
if ctx != nil {
req = req.WithContext(ctx)
}
if u.username != "" { if u.username != "" {
req.SetBasicAuth(u.username, u.password) req.SetBasicAuth(u.username, u.password)
} }
......
package alerting package alerting
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"time"
"github.com/grafana/grafana/pkg/bus" "github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/components/imguploader" "github.com/grafana/grafana/pkg/components/imguploader"
...@@ -39,8 +41,8 @@ type notificationService struct { ...@@ -39,8 +41,8 @@ type notificationService struct {
renderService rendering.Service renderService rendering.Service
} }
func (n *notificationService) SendIfNeeded(context *EvalContext) error { func (n *notificationService) SendIfNeeded(evalCtx *EvalContext) error {
notifierStates, err := n.getNeededNotifiers(context.Rule.OrgID, context.Rule.Notifications, context) notifierStates, err := n.getNeededNotifiers(evalCtx.Rule.OrgID, evalCtx.Rule.Notifications, evalCtx)
if err != nil { if err != nil {
n.log.Error("Failed to get alert notifiers", "error", err) n.log.Error("Failed to get alert notifiers", "error", err)
return err return err
...@@ -51,12 +53,22 @@ func (n *notificationService) SendIfNeeded(context *EvalContext) error { ...@@ -51,12 +53,22 @@ func (n *notificationService) SendIfNeeded(context *EvalContext) error {
} }
if notifierStates.ShouldUploadImage() { if notifierStates.ShouldUploadImage() {
if err = n.uploadImage(context); err != nil { // Create a copy of EvalContext and give it a new, shorter, timeout context to upload the image
n.log.Error("Failed to upload alert panel image.", "error", err) uploadEvalCtx := *evalCtx
timeout := setting.AlertingNotificationTimeout / 2
var uploadCtxCancel func()
uploadEvalCtx.Ctx, uploadCtxCancel = context.WithTimeout(evalCtx.Ctx, timeout)
// Try to upload the image without consuming all the time allocated for EvalContext
if err = n.renderAndUploadImage(&uploadEvalCtx, timeout); err != nil {
n.log.Error("Failed to render and upload alert panel image.", "ruleId", uploadEvalCtx.Rule.ID, "error", err)
} }
uploadCtxCancel()
evalCtx.ImageOnDiskPath = uploadEvalCtx.ImageOnDiskPath
evalCtx.ImagePublicURL = uploadEvalCtx.ImagePublicURL
} }
return n.sendNotifications(context, notifierStates) return n.sendNotifications(evalCtx, notifierStates)
} }
func (n *notificationService) sendAndMarkAsComplete(evalContext *EvalContext, notifierState *notifierState) error { func (n *notificationService) sendAndMarkAsComplete(evalContext *EvalContext, notifierState *notifierState) error {
...@@ -123,7 +135,7 @@ func (n *notificationService) sendNotifications(evalContext *EvalContext, notifi ...@@ -123,7 +135,7 @@ func (n *notificationService) sendNotifications(evalContext *EvalContext, notifi
return nil return nil
} }
func (n *notificationService) uploadImage(context *EvalContext) (err error) { func (n *notificationService) renderAndUploadImage(evalCtx *EvalContext, timeout time.Duration) (err error) {
uploader, err := newImageUploaderProvider() uploader, err := newImageUploaderProvider()
if err != nil { if err != nil {
return err return err
...@@ -132,32 +144,41 @@ func (n *notificationService) uploadImage(context *EvalContext) (err error) { ...@@ -132,32 +144,41 @@ func (n *notificationService) uploadImage(context *EvalContext) (err error) {
renderOpts := rendering.Opts{ renderOpts := rendering.Opts{
Width: 1000, Width: 1000,
Height: 500, Height: 500,
Timeout: setting.AlertingEvaluationTimeout, Timeout: timeout,
OrgId: context.Rule.OrgID, OrgId: evalCtx.Rule.OrgID,
OrgRole: models.ROLE_ADMIN, OrgRole: models.ROLE_ADMIN,
ConcurrentLimit: setting.AlertingRenderLimit, ConcurrentLimit: setting.AlertingRenderLimit,
} }
ref, err := context.GetDashboardUID() ref, err := evalCtx.GetDashboardUID()
if err != nil { if err != nil {
return err return err
} }
renderOpts.Path = fmt.Sprintf("d-solo/%s/%s?orgId=%d&panelId=%d", ref.Uid, ref.Slug, context.Rule.OrgID, context.Rule.PanelID) renderOpts.Path = fmt.Sprintf("d-solo/%s/%s?orgId=%d&panelId=%d", ref.Uid, ref.Slug, evalCtx.Rule.OrgID, evalCtx.Rule.PanelID)
result, err := n.renderService.Render(context.Ctx, renderOpts) n.log.Debug("Rendering alert panel image", "ruleId", evalCtx.Rule.ID, "urlPath", renderOpts.Path)
start := time.Now()
result, err := n.renderService.Render(evalCtx.Ctx, renderOpts)
if err != nil { if err != nil {
return err return err
} }
took := time.Since(start)
context.ImageOnDiskPath = result.FilePath n.log.Debug("Rendered alert panel image", "ruleId", evalCtx.Rule.ID, "path", result.FilePath, "took", took)
context.ImagePublicURL, err = uploader.Upload(context.Ctx, context.ImageOnDiskPath)
evalCtx.ImageOnDiskPath = result.FilePath
n.log.Debug("Uploading alert panel image to external image store", "ruleId", evalCtx.Rule.ID, "path", evalCtx.ImageOnDiskPath)
start = time.Now()
evalCtx.ImagePublicURL, err = uploader.Upload(evalCtx.Ctx, evalCtx.ImageOnDiskPath)
if err != nil { if err != nil {
return err return err
} }
took = time.Since(start)
if context.ImagePublicURL != "" { if evalCtx.ImagePublicURL != "" {
n.log.Info("uploaded screenshot of alert to external image store", "url", context.ImagePublicURL) n.log.Debug("Uploaded alert panel image to external image store", "ruleId", evalCtx.Rule.ID, "url", evalCtx.ImagePublicURL, "took", took)
} }
return nil return nil
......
...@@ -6,6 +6,7 @@ import ( ...@@ -6,6 +6,7 @@ import (
"time" "time"
"github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/services/rendering" "github.com/grafana/grafana/pkg/services/rendering"
...@@ -29,7 +30,7 @@ func TestNotificationService(t *testing.T) { ...@@ -29,7 +30,7 @@ func TestNotificationService(t *testing.T) {
} }
evalCtx := NewEvalContext(context.Background(), testRule) evalCtx := NewEvalContext(context.Background(), testRule)
notificationServiceScenario(t, "SendIfNeeded should render and upload image and send notification", evalCtx, true, func(scenarioCtx *scenarioContext) { notificationServiceScenario(t, "Given alert rule with upload image enabled should render and upload image and send notification", evalCtx, true, func(scenarioCtx *scenarioContext) {
err := scenarioCtx.notificationService.SendIfNeeded(evalCtx) err := scenarioCtx.notificationService.SendIfNeeded(evalCtx)
require.NoError(t, err) require.NoError(t, err)
...@@ -38,12 +39,70 @@ func TestNotificationService(t *testing.T) { ...@@ -38,12 +39,70 @@ func TestNotificationService(t *testing.T) {
require.Truef(t, evalCtx.Ctx.Value("notificationSent").(bool), "expected notification to be sent, but wasn't") require.Truef(t, evalCtx.Ctx.Value("notificationSent").(bool), "expected notification to be sent, but wasn't")
}) })
notificationServiceScenario(t, "SendIfNeeded should not render and upload image, but send notification", evalCtx, false, func(scenarioCtx *scenarioContext) { notificationServiceScenario(t, "Given alert rule with upload image disabled should not render and upload image, but send notification", evalCtx, false, func(scenarioCtx *scenarioContext) {
err := scenarioCtx.notificationService.SendIfNeeded(evalCtx) err := scenarioCtx.notificationService.SendIfNeeded(evalCtx)
require.NoError(t, err) require.NoError(t, err)
require.Equalf(t, 0, scenarioCtx.renderCount, "expected render to be called, but wasn't") require.Equalf(t, 0, scenarioCtx.renderCount, "expected render not to be called, but it was")
require.Equalf(t, 0, scenarioCtx.imageUploadCount, "expected image to be uploaded, but wasn't") require.Equalf(t, 0, scenarioCtx.imageUploadCount, "expected image not to be uploaded, but it was")
require.Truef(t, evalCtx.Ctx.Value("notificationSent").(bool), "expected notification to be sent, but wasn't")
})
notificationServiceScenario(t, "Given alert rule with upload image enabled and render times out should send notification", evalCtx, true, func(scenarioCtx *scenarioContext) {
setting.AlertingNotificationTimeout = 200 * time.Millisecond
scenarioCtx.renderProvider = func(ctx context.Context, opts rendering.Opts) (*rendering.RenderResult, error) {
wait := make(chan bool)
go func() {
time.Sleep(1 * time.Second)
wait <- true
}()
select {
case <-ctx.Done():
if err := ctx.Err(); err != nil {
return nil, err
}
break
case <-wait:
}
return nil, nil
}
err := scenarioCtx.notificationService.SendIfNeeded(evalCtx)
require.NoError(t, err)
require.Equalf(t, 0, scenarioCtx.renderCount, "expected render not to be called, but it was")
require.Equalf(t, 0, scenarioCtx.imageUploadCount, "expected image not to be uploaded, but it was")
require.Truef(t, evalCtx.Ctx.Value("notificationSent").(bool), "expected notification to be sent, but wasn't")
})
notificationServiceScenario(t, "Given alert rule with upload image enabled and upload times out should send notification", evalCtx, true, func(scenarioCtx *scenarioContext) {
setting.AlertingNotificationTimeout = 200 * time.Millisecond
scenarioCtx.uploadProvider = func(ctx context.Context, path string) (string, error) {
wait := make(chan bool)
go func() {
time.Sleep(1 * time.Second)
wait <- true
}()
select {
case <-ctx.Done():
if err := ctx.Err(); err != nil {
return "", err
}
break
case <-wait:
}
return "", nil
}
err := scenarioCtx.notificationService.SendIfNeeded(evalCtx)
require.NoError(t, err)
require.Equalf(t, 1, scenarioCtx.renderCount, "expected render to be called, but wasn't")
require.Equalf(t, 0, scenarioCtx.imageUploadCount, "expected image not to be uploaded, but it was")
require.Truef(t, evalCtx.Ctx.Value("notificationSent").(bool), "expected notification to be sent, but wasn't") require.Truef(t, evalCtx.Ctx.Value("notificationSent").(bool), "expected notification to be sent, but wasn't")
}) })
} }
...@@ -53,6 +112,8 @@ type scenarioContext struct { ...@@ -53,6 +112,8 @@ type scenarioContext struct {
notificationService *notificationService notificationService *notificationService
imageUploadCount int imageUploadCount int
renderCount int renderCount int
uploadProvider func(ctx context.Context, path string) (string, error)
renderProvider func(ctx context.Context, opts rendering.Opts) (*rendering.RenderResult, error)
} }
type scenarioFunc func(c *scenarioContext) type scenarioFunc func(c *scenarioContext)
...@@ -100,14 +161,26 @@ func notificationServiceScenario(t *testing.T, name string, evalCtx *EvalContext ...@@ -100,14 +161,26 @@ func notificationServiceScenario(t *testing.T, name string, evalCtx *EvalContext
return nil return nil
}) })
setting.AlertingNotificationTimeout = 30 * time.Second
scenarioCtx := &scenarioContext{ scenarioCtx := &scenarioContext{
evalCtx: evalCtx, evalCtx: evalCtx,
} }
uploadProvider := func(ctx context.Context, path string) (string, error) {
scenarioCtx.imageUploadCount++
return "", nil
}
imageUploader := &testImageUploader{ imageUploader := &testImageUploader{
uploadProvider: func(ctx context.Context, path string) (string, error) { uploadProvider: func(ctx context.Context, path string) (string, error) {
scenarioCtx.imageUploadCount++ if scenarioCtx.uploadProvider != nil {
return "", nil if _, err := scenarioCtx.uploadProvider(ctx, path); err != nil {
return "", err
}
}
return uploadProvider(ctx, path)
}, },
} }
...@@ -119,10 +192,20 @@ func notificationServiceScenario(t *testing.T, name string, evalCtx *EvalContext ...@@ -119,10 +192,20 @@ func notificationServiceScenario(t *testing.T, name string, evalCtx *EvalContext
newImageUploaderProvider = origNewImageUploaderProvider newImageUploaderProvider = origNewImageUploaderProvider
}() }()
renderProvider := func(ctx context.Context, opts rendering.Opts) (*rendering.RenderResult, error) {
scenarioCtx.renderCount++
return &rendering.RenderResult{FilePath: "image.png"}, nil
}
renderService := &testRenderService{ renderService := &testRenderService{
renderProvider: func(ctx context.Context, opts rendering.Opts) (*rendering.RenderResult, error) { renderProvider: func(ctx context.Context, opts rendering.Opts) (*rendering.RenderResult, error) {
scenarioCtx.renderCount++ if scenarioCtx.renderProvider != nil {
return &rendering.RenderResult{FilePath: "image.png"}, nil if _, err := scenarioCtx.renderProvider(ctx, opts); err != nil {
return nil, err
}
}
return renderProvider(ctx, opts)
}, },
} }
......
...@@ -60,6 +60,7 @@ func (rs *RenderingService) renderViaHttp(ctx context.Context, opts Opts) (*Rend ...@@ -60,6 +60,7 @@ func (rs *RenderingService) renderViaHttp(ctx context.Context, opts Opts) (*Rend
req.Header.Set("User-Agent", fmt.Sprintf("Grafana/%s", setting.BuildVersion)) req.Header.Set("User-Agent", fmt.Sprintf("Grafana/%s", setting.BuildVersion))
// gives service some additional time to timeout and return possible errors.
reqContext, cancel := context.WithTimeout(ctx, opts.Timeout+time.Second*2) reqContext, cancel := context.WithTimeout(ctx, opts.Timeout+time.Second*2)
defer cancel() defer cancel()
......
...@@ -63,6 +63,7 @@ func (rs *RenderingService) renderViaPhantomJS(ctx context.Context, opts Opts) ( ...@@ -63,6 +63,7 @@ func (rs *RenderingService) renderViaPhantomJS(ctx context.Context, opts Opts) (
cmdArgs = append([]string{fmt.Sprintf("--output-encoding=%s", opts.Encoding)}, cmdArgs...) cmdArgs = append([]string{fmt.Sprintf("--output-encoding=%s", opts.Encoding)}, cmdArgs...)
} }
// gives phantomjs some additional time to timeout and return possible errors.
commandCtx, cancel := context.WithTimeout(ctx, opts.Timeout+time.Second*2) commandCtx, cancel := context.WithTimeout(ctx, opts.Timeout+time.Second*2)
defer cancel() defer cancel()
......
...@@ -3,6 +3,7 @@ package rendering ...@@ -3,6 +3,7 @@ package rendering
import ( import (
"context" "context"
"fmt" "fmt"
"time"
pluginModel "github.com/grafana/grafana-plugin-model/go/renderer" pluginModel "github.com/grafana/grafana-plugin-model/go/renderer"
) )
...@@ -22,7 +23,8 @@ func (rs *RenderingService) renderViaPlugin(ctx context.Context, opts Opts) (*Re ...@@ -22,7 +23,8 @@ func (rs *RenderingService) renderViaPlugin(ctx context.Context, opts Opts) (*Re
return nil, err return nil, err
} }
ctx, cancel := context.WithTimeout(ctx, opts.Timeout) // gives plugin some additional time to timeout and return possible errors.
ctx, cancel := context.WithTimeout(ctx, opts.Timeout+time.Second*2)
defer cancel() defer cancel()
req := &pluginModel.RenderRequest{ req := &pluginModel.RenderRequest{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment