Commit 589b5e11 by Ivana Huckova Committed by GitHub

Loki: Retry web socket connection when connection is closed abnormally (#29438)

* Add retryWhen to Loki socket

* Refactor,

* Add info message after 10 reconnections, remove console log

* Add test

* Specify response in tests

* Check specific lines for content

* Remove unused imports

* Replace AppEvents with console.warn

* Add limit of re-connections
parent 56e7ce6f
...@@ -259,7 +259,7 @@ export class LokiDatasource extends DataSourceApi<LokiQuery, LokiOptions> { ...@@ -259,7 +259,7 @@ export class LokiDatasource extends DataSourceApi<LokiQuery, LokiOptions> {
return this.streams.getStream(liveTarget).pipe( return this.streams.getStream(liveTarget).pipe(
map(data => ({ map(data => ({
data, data: data || [],
key: `loki-${liveTarget.refId}`, key: `loki-${liveTarget.refId}`,
state: LoadingState.Streaming, state: LoadingState.Streaming,
})) }))
......
import { Observable, Subject } from 'rxjs'; import { Observable, Subject, of, throwError, concat } from 'rxjs';
import { mergeMap } from 'rxjs/operators';
import * as rxJsWebSocket from 'rxjs/webSocket'; import * as rxJsWebSocket from 'rxjs/webSocket';
import { LiveStreams } from './live_streams'; import { LiveStreams } from './live_streams';
import { DataFrame, DataFrameView, formatLabels, Labels } from '@grafana/data'; import { DataFrame, DataFrameView, formatLabels, Labels } from '@grafana/data';
...@@ -105,6 +106,51 @@ describe('Live Stream Tests', () => { ...@@ -105,6 +106,51 @@ describe('Live Stream Tests', () => {
subscription.unsubscribe(); subscription.unsubscribe();
expect(unsubscribed).toBe(true); expect(unsubscribed).toBe(true);
}); });
it('should reconnect when abnormal error', async () => {
const abnormalError = new Error('weird error') as any;
abnormalError.code = 1006;
const logStreamBeforeError = of({
streams: [
{
stream: { filename: '/var/log/sntpc.log', job: 'varlogs' },
values: [['1567025440118944705', 'Kittens']],
},
],
dropped_entries: null,
});
const logStreamAfterError = of({
streams: [
{
stream: { filename: '/var/log/sntpc.log', job: 'varlogs' },
values: [['1567025440118944705', 'Doggos']],
},
],
dropped_entries: null,
});
const errorStream = throwError(abnormalError);
let retries = 0;
fakeSocket = of({}).pipe(
mergeMap(() => {
// When subscribed first time, return logStream and errorStream
if (retries++ === 0) {
return concat(logStreamBeforeError, errorStream);
}
// When re-subsribed after abnormal error, return just logStream
return logStreamAfterError;
})
) as any;
const liveStreams = new LiveStreams();
await expect(liveStreams.getStream(makeTarget('url_to_match'), 100)).toEmitValuesWith(received => {
const data = received[0];
const view = new DataFrameView(data[0]);
const firstLog = { ...view.get(0) };
const secondLog = { ...view.get(1) };
expect(firstLog.line).toBe('Kittens');
expect(secondLog.line).toBe('Doggos');
expect(retries).toBe(2);
});
});
}); });
/** /**
......
import { DataFrame, FieldType, parseLabels, KeyValue, CircularDataFrame } from '@grafana/data'; import { DataFrame, FieldType, parseLabels, KeyValue, CircularDataFrame } from '@grafana/data';
import { Observable, throwError } from 'rxjs'; import { Observable, throwError, timer } from 'rxjs';
import { webSocket } from 'rxjs/webSocket'; import { webSocket } from 'rxjs/webSocket';
import { LokiTailResponse } from './types'; import { LokiTailResponse } from './types';
import { finalize, map, catchError } from 'rxjs/operators'; import { finalize, map, retryWhen, mergeMap } from 'rxjs/operators';
import { appendResponseToBufferedData } from './result_transformer'; import { appendResponseToBufferedData } from './result_transformer';
/** /**
...@@ -22,7 +22,7 @@ export interface LokiLiveTarget { ...@@ -22,7 +22,7 @@ export interface LokiLiveTarget {
export class LiveStreams { export class LiveStreams {
private streams: KeyValue<Observable<DataFrame[]>> = {}; private streams: KeyValue<Observable<DataFrame[]>> = {};
getStream(target: LokiLiveTarget): Observable<DataFrame[]> { getStream(target: LokiLiveTarget, retryInterval = 5000): Observable<DataFrame[]> {
let stream = this.streams[target.url]; let stream = this.streams[target.url];
if (stream) { if (stream) {
...@@ -42,9 +42,27 @@ export class LiveStreams { ...@@ -42,9 +42,27 @@ export class LiveStreams {
appendResponseToBufferedData(response, data); appendResponseToBufferedData(response, data);
return [data]; return [data];
}), }),
catchError(err => { retryWhen((attempts: Observable<any>) =>
return throwError(`error: ${err.reason}`); attempts.pipe(
}), mergeMap((error, i) => {
const retryAttempt = i + 1;
// Code 1006 is used to indicate that a connection was closed abnormally.
// Added hard limit of 30 on number of retries.
// If connection was closed abnormally, and we wish to retry, otherwise throw error.
if (error.code === 1006 && retryAttempt < 30) {
if (retryAttempt > 10) {
// If more than 10 times retried, consol.warn, but keep reconnecting
console.warn(
`Websocket connection is being disrupted. We keep reconnecting but consider starting new live tailing again. Error: ${error.reason}`
);
}
// Retry every 5s
return timer(retryInterval);
}
return throwError(`error: ${error.reason}`);
})
)
),
finalize(() => { finalize(() => {
delete this.streams[target.url]; delete this.streams[target.url];
}) })
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment