Commit 783391a8 by kay delaney Committed by GitHub

CloudWatch Logs: Adjusts CloudWatch Logs timeout logic (#26621)

* CloudWatch Logs: Adjusts CloudWatch Logs timeout logic

Previously CloudWatch Logs queries would time out if,
after a number of attempts, a response was received with no additional data.
This commit changes the behavior so that a consecutive number of requests
yielding no additional data must be made before we cancel the query
parent 58af5413
......@@ -399,6 +399,12 @@ export interface DataQuery {
datasource?: string | null;
}
export enum DataQueryErrorType {
Cancelled = 'cancelled',
Timeout = 'timeout',
Unknown = 'unknown',
}
export interface DataQueryError {
data?: {
message?: string;
......@@ -408,7 +414,7 @@ export interface DataQueryError {
status?: string;
statusText?: string;
refId?: string;
cancelled?: boolean;
type?: DataQueryErrorType;
}
export interface DataQueryRequest<TQuery extends DataQuery = DataQuery> {
......
......@@ -2,8 +2,8 @@ import { from, merge, MonoTypeOperatorFunction, Observable, Subject, Subscriptio
import { catchError, filter, map, mergeMap, retryWhen, share, takeUntil, tap, throwIfEmpty } from 'rxjs/operators';
import { fromFetch } from 'rxjs/fetch';
import { v4 as uuidv4 } from 'uuid';
import { BackendSrv as BackendService, BackendSrvRequest, FetchError, FetchResponse } from '@grafana/runtime';
import { AppEvents } from '@grafana/data';
import { BackendSrv as BackendService, BackendSrvRequest, FetchResponse, FetchError } from '@grafana/runtime';
import { AppEvents, DataQueryErrorType } from '@grafana/data';
import appEvents from 'app/core/app_events';
import config, { getConfig } from 'app/core/config';
......@@ -341,7 +341,7 @@ export class BackendSrv implements BackendService {
// when a request is cancelled by takeUntil it will complete without emitting anything so we use throwIfEmpty to identify this case
// in throwIfEmpty we'll then throw an cancelled error and then we'll return the correct result in the catchError or rethrow
throwIfEmpty(() => ({
cancelled: true,
type: DataQueryErrorType.Cancelled,
data: null,
status: this.HTTP_REQUEST_CANCELED,
statusText: 'Request was aborted',
......
import 'whatwg-fetch'; // fetch polyfill needed for PhantomJs rendering
import { Observable, of } from 'rxjs';
import { delay } from 'rxjs/operators';
import { AppEvents } from '@grafana/data';
import { AppEvents, DataQueryErrorType } from '@grafana/data';
import { BackendSrv } from '../services/backend_srv';
import { Emitter } from '../utils/emitter';
......@@ -352,7 +352,7 @@ describe('backendSrv', () => {
expect(unsubscribe).toHaveBeenCalledTimes(1);
expect(slowError).toEqual({
cancelled: true,
type: DataQueryErrorType.Cancelled,
data: null,
status: -1,
statusText: 'Request was aborted',
......@@ -539,7 +539,7 @@ describe('backendSrv', () => {
catchedError = err;
}
expect(catchedError.cancelled).toEqual(true);
expect(catchedError.type).toEqual(DataQueryErrorType.Cancelled);
expect(catchedError.statusText).toEqual('Request was aborted');
expect(unsubscribe).toHaveBeenCalledTimes(2);
});
......
......@@ -13,7 +13,6 @@ const makeError = (propOverrides?: DataQueryError): DataQueryError => {
status: 'Error status',
statusText: 'Error status text',
refId: 'A',
cancelled: false,
};
Object.assign(queryError, propOverrides);
return queryError;
......
......@@ -13,6 +13,7 @@ import {
ExploreMode,
LogsDedupStrategy,
sortLogsResult,
DataQueryErrorType,
} from '@grafana/data';
import { RefreshPicker } from '@grafana/ui';
import { LocationUpdate } from '@grafana/runtime';
......@@ -510,7 +511,13 @@ export const processQueryResponse = (
const { request, state: loadingState, series, error } = response;
if (error) {
if (error.cancelled) {
if (error.type === DataQueryErrorType.Timeout) {
return {
...state,
queryResponse: response,
loading: loadingState === LoadingState.Loading || loadingState === LoadingState.Streaming,
};
} else if (error.type === DataQueryErrorType.Cancelled) {
return state;
}
......
......@@ -18,6 +18,7 @@ import {
TimeRange,
toDataFrame,
rangeUtil,
DataQueryErrorType,
} from '@grafana/data';
import { getBackendSrv, toDataQueryResponse } from '@grafana/runtime';
import { TemplateSrv } from 'app/features/templating/template_srv';
......@@ -39,13 +40,14 @@ import {
MetricRequest,
TSDBResponse,
} from './types';
import { empty, from, Observable, of, merge } from 'rxjs';
import { catchError, delay, expand, finalize, map, mergeMap, tap } from 'rxjs/operators';
import { from, Observable, of, merge, zip } from 'rxjs';
import { catchError, finalize, map, mergeMap, tap, concatMap, scan, share, repeat, takeWhile } from 'rxjs/operators';
import { CloudWatchLanguageProvider } from './language_provider';
import { VariableWithMultiSupport } from 'app/features/variables/types';
import { RowContextOptions } from '@grafana/ui/src/components/Logs/LogRowContextProvider';
import { AwsUrl, encodeUrl } from './aws_url';
import { increasingInterval } from './utils/rxjs/increasingInterval';
const TSDB_QUERY_ENDPOINT = '/api/tsdb/query';
......@@ -67,9 +69,7 @@ const displayAlert = (datasourceName: string, region: string) =>
const displayCustomError = (title: string, message: string) =>
store.dispatch(notifyApp(createErrorNotification(title, message)));
// TODO: Temporary times here, could just change to some fixed number.
export const MAX_ATTEMPTS = 8;
const POLLING_TIMES = [100, 200, 500, 1000];
export const MAX_ATTEMPTS = 5;
export class CloudWatchDatasource extends DataSourceApi<CloudWatchQuery, CloudWatchJsonData> {
type: any;
......@@ -234,65 +234,80 @@ export class CloudWatchDatasource extends DataSourceApi<CloudWatchQuery, CloudWa
statsQuery: (param.statsGroups?.length ?? 0) > 0 ?? false,
};
});
let prevRecordsMatched: Record<string, number> = {};
return withTeardown(
this.makeLogActionRequest('GetQueryResults', queryParams).pipe(
expand((dataFrames, i) => {
const allFramesCompleted = dataFrames.every(
dataFrame => dataFrame.meta?.custom?.['Status'] === CloudWatchLogsQueryStatus.Complete
);
return allFramesCompleted
? empty()
: this.makeLogActionRequest('GetQueryResults', queryParams).pipe(
map(frames => {
let moreRecordsMatched = false;
for (const frame of frames) {
const recordsMatched = frame.meta?.stats?.find(stat => stat.displayName === 'Records matched')
?.value!;
if (recordsMatched > (prevRecordsMatched[frame.refId!] ?? 0)) {
moreRecordsMatched = true;
}
prevRecordsMatched[frame.refId!] = recordsMatched;
}
const noProgressMade = i >= MAX_ATTEMPTS - 2 && !moreRecordsMatched;
if (noProgressMade) {
for (const frame of frames) {
_.set(frame, 'meta.custom.Status', CloudWatchLogsQueryStatus.Complete);
}
}
const dataFrames = increasingInterval({ startPeriod: 100, endPeriod: 1000, step: 300 }).pipe(
concatMap(_ => this.makeLogActionRequest('GetQueryResults', queryParams)),
repeat(),
share()
);
return frames;
}),
delay(POLLING_TIMES[Math.min(i, POLLING_TIMES.length - 1)])
);
}),
tap(dataFrames => {
dataFrames.forEach((dataframe, i) => {
if (
[
CloudWatchLogsQueryStatus.Complete,
CloudWatchLogsQueryStatus.Cancelled,
CloudWatchLogsQueryStatus.Failed,
].includes(dataframe.meta?.custom?.['Status']) &&
this.logQueries.hasOwnProperty(dataframe.refId!)
) {
delete this.logQueries[dataframe.refId!];
const consecutiveFailedAttempts = dataFrames.pipe(
scan(
({ failures, prevRecordsMatched }, frames) => {
failures++;
for (const frame of frames) {
const recordsMatched = frame.meta?.stats?.find(stat => stat.displayName === 'Records matched')?.value!;
if (recordsMatched > (prevRecordsMatched[frame.refId!] ?? 0)) {
failures = 0;
}
});
}),
map(dataFrames => ({
prevRecordsMatched[frame.refId!] = recordsMatched;
}
return { failures, prevRecordsMatched };
},
{ failures: 0, prevRecordsMatched: {} as Record<string, number> }
),
map(({ failures }) => failures),
share()
);
const queryResponse: Observable<DataQueryResponse> = zip(dataFrames, consecutiveFailedAttempts).pipe(
tap(([dataFrames]) => {
for (const frame of dataFrames) {
if (
[
CloudWatchLogsQueryStatus.Complete,
CloudWatchLogsQueryStatus.Cancelled,
CloudWatchLogsQueryStatus.Failed,
].includes(frame.meta?.custom?.['Status']) &&
this.logQueries.hasOwnProperty(frame.refId!)
) {
delete this.logQueries[frame.refId!];
}
}
}),
map(([dataFrames, failedAttempts]) => {
if (failedAttempts >= MAX_ATTEMPTS) {
for (const frame of dataFrames) {
_.set(frame, 'meta.custom.Status', CloudWatchLogsQueryStatus.Cancelled);
}
}
return {
data: dataFrames,
key: 'test-key',
state: dataFrames.every(
dataFrame => dataFrame.meta?.custom?.['Status'] === CloudWatchLogsQueryStatus.Complete
state: dataFrames.every(dataFrame =>
[
CloudWatchLogsQueryStatus.Complete,
CloudWatchLogsQueryStatus.Cancelled,
CloudWatchLogsQueryStatus.Failed,
].includes(dataFrame.meta?.custom?.['Status'])
)
? LoadingState.Done
: LoadingState.Loading,
}))
),
() => this.stopQueries()
error:
failedAttempts >= MAX_ATTEMPTS
? {
message: `error: query timed out after ${MAX_ATTEMPTS} attempts`,
type: DataQueryErrorType.Timeout,
}
: undefined,
};
}),
takeWhile(({ state }) => state !== LoadingState.Error && state !== LoadingState.Done, true)
);
return withTeardown(queryResponse, () => this.stopQueries());
}
private addDataLinksToLogsResponse(response: DataQueryResponse, options: DataQueryRequest<CloudWatchQuery>) {
......
import '../datasource';
import { CloudWatchDatasource, MAX_ATTEMPTS } from '../datasource';
import * as redux from 'app/store/store';
import { DataFrame, DataQueryResponse, DataSourceInstanceSettings, dateMath, getFrameDisplayName } from '@grafana/data';
import {
DataFrame,
DataQueryResponse,
DataSourceInstanceSettings,
dateMath,
getFrameDisplayName,
DataQueryErrorType,
} from '@grafana/data';
import { TemplateSrv } from 'app/features/templating/template_srv';
import { CloudWatchLogsQueryStatus, CloudWatchMetricsQuery, CloudWatchQuery, LogAction } from '../types';
import { backendSrv } from 'app/core/services/backend_srv'; // will use the version in __mocks__
import { TimeSrv } from 'app/features/dashboard/services/TimeSrv';
import { convertToStoreState } from '../../../../../test/helpers/convertToStoreState';
import { getTemplateSrvDependencies } from 'test/helpers/getTemplateSrvDependencies';
import { of } from 'rxjs';
import { of, interval } from 'rxjs';
import { CustomVariableModel, VariableHide } from '../../../../features/variables/types';
import * as rxjsUtils from '../utils/rxjs/increasingInterval';
jest.mock('rxjs/operators', () => {
const operators = jest.requireActual('rxjs/operators');
operators.delay = jest.fn(() => (s: any) => s);
......@@ -113,6 +122,10 @@ describe('CloudWatchDatasource', () => {
});
describe('When performing CloudWatch logs query', () => {
beforeEach(() => {
jest.spyOn(rxjsUtils, 'increasingInterval').mockImplementation(() => interval(100));
});
it('should add data links to response', () => {
const mockResponse: DataQueryResponse = {
data: [
......@@ -164,13 +177,26 @@ describe('CloudWatchDatasource', () => {
});
});
it('should stop querying when no more data retrieved past max attempts', async () => {
const fakeFrames = genMockFrames(10);
for (let i = 7; i < fakeFrames.length; i++) {
it('should stop querying when no more data received a number of times in a row', async () => {
const fakeFrames = genMockFrames(20);
const initialRecordsMatched = fakeFrames[0].meta!.stats!.find(stat => stat.displayName === 'Records matched')!
.value!;
for (let i = 1; i < 4; i++) {
fakeFrames[i].meta!.stats = [
{
displayName: 'Records matched',
value: initialRecordsMatched,
},
];
}
const finalRecordsMatched = fakeFrames[9].meta!.stats!.find(stat => stat.displayName === 'Records matched')!
.value!;
for (let i = 10; i < fakeFrames.length; i++) {
fakeFrames[i].meta!.stats = [
{
displayName: 'Records matched',
value: fakeFrames[6].meta!.stats?.find(stat => stat.displayName === 'Records matched')?.value!,
value: finalRecordsMatched,
},
];
}
......@@ -190,22 +216,26 @@ describe('CloudWatchDatasource', () => {
const expectedData = [
{
...fakeFrames[MAX_ATTEMPTS - 1],
...fakeFrames[14],
meta: {
custom: {
...fakeFrames[MAX_ATTEMPTS - 1].meta!.custom,
Status: 'Complete',
Status: 'Cancelled',
},
stats: fakeFrames[MAX_ATTEMPTS - 1].meta!.stats,
stats: fakeFrames[14].meta!.stats,
},
},
];
expect(myResponse).toEqual({
data: expectedData,
key: 'test-key',
state: 'Done',
error: {
type: DataQueryErrorType.Timeout,
message: `error: query timed out after ${MAX_ATTEMPTS} attempts`,
},
});
expect(i).toBe(MAX_ATTEMPTS);
expect(i).toBe(15);
});
it('should continue querying as long as new data is being received', async () => {
......@@ -1113,6 +1143,7 @@ function genMockFrames(numResponses: number): DataFrame[] {
},
],
},
refId: 'A',
length: 0,
});
}
......
import { SchedulerLike, Observable, SchedulerAction, Subscriber, asyncScheduler } from 'rxjs';
/**
* Creates an Observable that emits sequential numbers after increasing intervals of time
* starting with `startPeriod`, ending with `endPeriod` and incrementing by `step`.
*/
export const increasingInterval = (
{ startPeriod = 0, endPeriod = 5000, step = 1000 },
scheduler: SchedulerLike = asyncScheduler
): Observable<number> => {
return new Observable<number>(subscriber => {
subscriber.add(
scheduler.schedule(dispatch, startPeriod, { subscriber, counter: 0, period: startPeriod, step, endPeriod })
);
return subscriber;
});
};
function dispatch(this: SchedulerAction<IntervalState>, state: IntervalState) {
const { subscriber, counter, period, step, endPeriod } = state;
subscriber.next(counter);
const newPeriod = Math.min(period + step, endPeriod);
this.schedule({ subscriber, counter: counter + 1, period: newPeriod, step, endPeriod }, newPeriod);
}
interface IntervalState {
subscriber: Subscriber<number>;
counter: number;
period: number;
endPeriod: number;
step: number;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment