Commit 726bb447 by Ryan McKinley Committed by GitHub

Live: cleanup and simple changes (#28028)

parent 0ffd9a9a
...@@ -3346,7 +3346,7 @@ ...@@ -3346,7 +3346,7 @@
# # pid_finder = "pgrep" # # pid_finder = "pgrep"
# # Reads last_run_summary.yaml file and converts to measurments # # Reads last_run_summary.yaml file and converts to measurements
# [[inputs.puppetagent]] # [[inputs.puppetagent]]
# ## Location of puppet last run summary file # ## Location of puppet last run summary file
# location = "/var/lib/puppet/state/last_run_summary.yaml" # location = "/var/lib/puppet/state/last_run_summary.yaml"
......
...@@ -146,18 +146,21 @@ export interface LiveChannelPresenceStatus { ...@@ -146,18 +146,21 @@ export interface LiveChannelPresenceStatus {
/** /**
* @experimental * @experimental
*/ */
export interface LiveChannelAddress {
scope: LiveChannelScope;
namespace: string; // depends on the scope
path: string;
}
/**
* @experimental
*/
export interface LiveChannel<TMessage = any, TPublish = any> { export interface LiveChannel<TMessage = any, TPublish = any> {
/** The fully qualified channel id: ${scope}/${namespace}/${path} */ /** The fully qualified channel id: ${scope}/${namespace}/${path} */
id: string; id: string;
/** The scope for this channel */ /** The channel address */
scope: LiveChannelScope; addr: LiveChannelAddress;
/** datasourceId/plugin name/feature depending on scope */
namespace: string;
/** additional qualifier */
path: string;
/** Unix timestamp for when the channel connected */ /** Unix timestamp for when the channel connected */
opened: number; opened: number;
......
import { parseLabels, formatLabels, findCommonLabels, findUniqueLabels } from './labels'; import { parseLabels, formatLabels, findCommonLabels, findUniqueLabels, matchAllLabels } from './labels';
import { Labels } from '../types/data';
describe('parseLabels()', () => { describe('parseLabels()', () => {
it('returns no labels on empty labels string', () => { it('returns no labels on empty labels string', () => {
...@@ -53,3 +54,25 @@ describe('findUniqueLabels()', () => { ...@@ -53,3 +54,25 @@ describe('findUniqueLabels()', () => {
expect(findUniqueLabels({ foo: '"bar"', baz: '"42"' }, { foo: '"bar"' })).toEqual({ baz: '"42"' }); expect(findUniqueLabels({ foo: '"bar"', baz: '"42"' }, { foo: '"bar"' })).toEqual({ baz: '"42"' });
}); });
}); });
describe('matchAllLabels()', () => {
it('empty labels do math', () => {
expect(matchAllLabels({}, {})).toBeTruthy();
});
it('missing labels', () => {
expect(matchAllLabels({ foo: 'bar' }, {})).toBeFalsy();
});
it('extra labels should match', () => {
expect(matchAllLabels({ foo: 'bar' }, { foo: 'bar', baz: '22' })).toBeTruthy();
});
it('be graceful with null values (match)', () => {
expect(matchAllLabels({ foo: 'bar' })).toBeFalsy();
});
it('be graceful with null values (match)', () => {
expect(matchAllLabels((undefined as unknown) as Labels, { foo: 'bar' })).toBeTruthy();
});
});
...@@ -60,6 +60,21 @@ export function findUniqueLabels(labels: Labels | undefined, commonLabels: Label ...@@ -60,6 +60,21 @@ export function findUniqueLabels(labels: Labels | undefined, commonLabels: Label
} }
/** /**
* Check that all labels exist in another set of labels
*/
export function matchAllLabels(expect: Labels, against?: Labels): boolean {
if (!expect) {
return true; // nothing to match
}
for (const [key, value] of Object.entries(expect)) {
if (!against || against[key] !== value) {
return false;
}
}
return true;
}
/**
* Serializes the given labels to a string. * Serializes the given labels to a string.
*/ */
export function formatLabels(labels: Labels, defaultValue = '', withoutBraces?: boolean): string { export function formatLabels(labels: Labels, defaultValue = '', withoutBraces?: boolean): string {
......
import { LiveChannel, LiveChannelScope } from '@grafana/data'; import { LiveChannel, LiveChannelAddress } from '@grafana/data';
import { Observable } from 'rxjs'; import { Observable } from 'rxjs';
/** /**
...@@ -23,11 +23,7 @@ export interface GrafanaLiveSrv { ...@@ -23,11 +23,7 @@ export interface GrafanaLiveSrv {
* Multiple requests for this channel will return the same object until * Multiple requests for this channel will return the same object until
* the channel is shutdown * the channel is shutdown
*/ */
getChannel<TMessage, TPublish>( getChannel<TMessage, TPublish = any>(address: LiveChannelAddress): LiveChannel<TMessage, TPublish>;
scope: LiveChannelScope,
namespace: string,
path: string
): LiveChannel<TMessage, TPublish>;
} }
let singletonInstance: GrafanaLiveSrv; let singletonInstance: GrafanaLiveSrv;
......
...@@ -426,7 +426,7 @@ func (hs *HTTPServer) registerRoutes() { ...@@ -426,7 +426,7 @@ func (hs *HTTPServer) registerRoutes() {
// Live streaming // Live streaming
if hs.Live != nil { if hs.Live != nil {
r.Any("/live/*", hs.Live.Handler) r.Any("/live/*", hs.Live.WebsocketHandler)
} }
// Snapshots // Snapshots
......
...@@ -31,7 +31,7 @@ type GrafanaLive struct { ...@@ -31,7 +31,7 @@ type GrafanaLive struct {
node *centrifuge.Node node *centrifuge.Node
// The websocket handler // The websocket handler
Handler interface{} WebsocketHandler interface{}
// Full channel handler // Full channel handler
channels map[string]models.ChannelHandler channels map[string]models.ChannelHandler
...@@ -171,7 +171,7 @@ func InitializeBroker() (*GrafanaLive, error) { ...@@ -171,7 +171,7 @@ func InitializeBroker() (*GrafanaLive, error) {
WriteBufferSize: 1024, WriteBufferSize: 1024,
}) })
glive.Handler = func(ctx *models.ReqContext) { glive.WebsocketHandler = func(ctx *models.ReqContext) {
user := ctx.SignedInUser user := ctx.SignedInUser
if user == nil { if user == nil {
ctx.Resp.WriteHeader(401) ctx.Resp.WriteHeader(401)
......
...@@ -62,7 +62,7 @@ export class LivePanel extends PureComponent<Props, State> { ...@@ -62,7 +62,7 @@ export class LivePanel extends PureComponent<Props, State> {
startSubscription = () => { startSubscription = () => {
const { scope, namespace, path } = this.props; const { scope, namespace, path } = this.props;
const channel = getGrafanaLiveSrv().getChannel(scope, namespace, path); const channel = getGrafanaLiveSrv().getChannel({ scope, namespace, path });
if (this.state.channel === channel) { if (this.state.channel === channel) {
return; // no change! return; // no change!
} }
......
import { import {
LiveChannelConfig, LiveChannelConfig,
LiveChannel, LiveChannel,
LiveChannelScope,
LiveChannelStatusEvent, LiveChannelStatusEvent,
LiveChannelEvent, LiveChannelEvent,
LiveChannelEventType, LiveChannelEventType,
LiveChannelConnectionState, LiveChannelConnectionState,
LiveChannelPresenceStatus, LiveChannelPresenceStatus,
LiveChannelAddress,
} from '@grafana/data'; } from '@grafana/data';
import Centrifuge, { import Centrifuge, {
JoinLeaveContext, JoinLeaveContext,
...@@ -26,9 +26,7 @@ export class CentrifugeLiveChannel<TMessage = any, TPublish = any> implements Li ...@@ -26,9 +26,7 @@ export class CentrifugeLiveChannel<TMessage = any, TPublish = any> implements Li
readonly opened = Date.now(); readonly opened = Date.now();
readonly id: string; readonly id: string;
readonly scope: LiveChannelScope; readonly addr: LiveChannelAddress;
readonly namespace: string;
readonly path: string;
readonly stream = new Subject<LiveChannelEvent<TMessage>>(); readonly stream = new Subject<LiveChannelEvent<TMessage>>();
...@@ -37,11 +35,9 @@ export class CentrifugeLiveChannel<TMessage = any, TPublish = any> implements Li ...@@ -37,11 +35,9 @@ export class CentrifugeLiveChannel<TMessage = any, TPublish = any> implements Li
subscription?: Centrifuge.Subscription; subscription?: Centrifuge.Subscription;
shutdownCallback?: () => void; shutdownCallback?: () => void;
constructor(id: string, scope: LiveChannelScope, namespace: string, path: string) { constructor(id: string, addr: LiveChannelAddress) {
this.id = id; this.id = id;
this.scope = scope; this.addr = addr;
this.namespace = namespace;
this.path = path;
this.currentStatus = { this.currentStatus = {
type: LiveChannelEventType.Status, type: LiveChannelEventType.Status,
id, id,
...@@ -61,15 +57,25 @@ export class CentrifugeLiveChannel<TMessage = any, TPublish = any> implements Li ...@@ -61,15 +57,25 @@ export class CentrifugeLiveChannel<TMessage = any, TPublish = any> implements Li
const events: SubscriptionEvents = { const events: SubscriptionEvents = {
// This means a message was received from the server // This means a message was received from the server
publish: (ctx: PublicationContext) => { publish: (ctx: PublicationContext) => {
this.stream.next({ try {
type: LiveChannelEventType.Message, const message = prepare(ctx.data);
message: prepare(ctx.data), if (message) {
}); this.stream.next({
type: LiveChannelEventType.Message,
// Clear any error messages message,
if (this.currentStatus.error) { });
}
// Clear any error messages
if (this.currentStatus.error) {
this.currentStatus.timestamp = Date.now();
delete this.currentStatus.error;
this.sendStatus();
}
} catch (err) {
console.log('publish error', config.path, err);
this.currentStatus.error = err;
this.currentStatus.timestamp = Date.now(); this.currentStatus.timestamp = Date.now();
delete this.currentStatus.error;
this.sendStatus(); this.sendStatus();
} }
}, },
...@@ -81,6 +87,7 @@ export class CentrifugeLiveChannel<TMessage = any, TPublish = any> implements Li ...@@ -81,6 +87,7 @@ export class CentrifugeLiveChannel<TMessage = any, TPublish = any> implements Li
subscribe: (ctx: SubscribeSuccessContext) => { subscribe: (ctx: SubscribeSuccessContext) => {
this.currentStatus.timestamp = Date.now(); this.currentStatus.timestamp = Date.now();
this.currentStatus.state = LiveChannelConnectionState.Connected; this.currentStatus.state = LiveChannelConnectionState.Connected;
delete this.currentStatus.error;
this.sendStatus(); this.sendStatus();
}, },
unsubscribe: (ctx: UnsubscribeContext) => { unsubscribe: (ctx: UnsubscribeContext) => {
...@@ -159,19 +166,11 @@ export class CentrifugeLiveChannel<TMessage = any, TPublish = any> implements Li ...@@ -159,19 +166,11 @@ export class CentrifugeLiveChannel<TMessage = any, TPublish = any> implements Li
} }
} }
export function getErrorChannel( export function getErrorChannel(msg: string, id: string, addr: LiveChannelAddress): LiveChannel {
msg: string,
id: string,
scope: LiveChannelScope,
namespace: string,
path: string
): LiveChannel {
return { return {
id, id,
opened: Date.now(), opened: Date.now(),
scope, addr,
namespace,
path,
// return an error // return an error
getStream: () => getStream: () =>
......
...@@ -56,7 +56,11 @@ class DashboardWatcher { ...@@ -56,7 +56,11 @@ class DashboardWatcher {
// Check for changes // Check for changes
if (uid !== this.uid) { if (uid !== this.uid) {
this.leave(); this.leave();
this.channel = live.getChannel(LiveChannelScope.Grafana, 'dashboard', uid); this.channel = live.getChannel({
scope: LiveChannelScope.Grafana,
namespace: 'dashboard',
path: uid,
});
this.channel.getStream().subscribe(this.observer); this.channel.getStream().subscribe(this.observer);
this.uid = uid; this.uid = uid;
} }
......
...@@ -2,7 +2,7 @@ import Centrifuge from 'centrifuge/dist/centrifuge.protobuf'; ...@@ -2,7 +2,7 @@ import Centrifuge from 'centrifuge/dist/centrifuge.protobuf';
import SockJS from 'sockjs-client'; import SockJS from 'sockjs-client';
import { GrafanaLiveSrv, setGrafanaLiveSrv, getGrafanaLiveSrv, config } from '@grafana/runtime'; import { GrafanaLiveSrv, setGrafanaLiveSrv, getGrafanaLiveSrv, config } from '@grafana/runtime';
import { BehaviorSubject } from 'rxjs'; import { BehaviorSubject } from 'rxjs';
import { LiveChannel, LiveChannelScope } from '@grafana/data'; import { LiveChannel, LiveChannelScope, LiveChannelAddress } from '@grafana/data';
import { CentrifugeLiveChannel, getErrorChannel } from './channel'; import { CentrifugeLiveChannel, getErrorChannel } from './channel';
import { import {
GrafanaLiveScope, GrafanaLiveScope,
...@@ -84,23 +84,19 @@ export class CentrifugeSrv implements GrafanaLiveSrv { ...@@ -84,23 +84,19 @@ export class CentrifugeSrv implements GrafanaLiveSrv {
* Get a channel. If the scope, namespace, or path is invalid, a shutdown * Get a channel. If the scope, namespace, or path is invalid, a shutdown
* channel will be returned with an error state indicated in its status * channel will be returned with an error state indicated in its status
*/ */
getChannel<TMessage, TPublish>( getChannel<TMessage, TPublish = any>(addr: LiveChannelAddress): LiveChannel<TMessage, TPublish> {
scopeId: LiveChannelScope, const id = `${addr.scope}/${addr.namespace}/${addr.path}`;
namespace: string,
path: string
): LiveChannel<TMessage, TPublish> {
const id = `${scopeId}/${namespace}/${path}`;
let channel = this.open.get(id); let channel = this.open.get(id);
if (channel != null) { if (channel != null) {
return channel; return channel;
} }
const scope = this.scopes[scopeId]; const scope = this.scopes[addr.scope];
if (!scope) { if (!scope) {
return getErrorChannel('invalid scope', id, scopeId, namespace, path); return getErrorChannel('invalid scope', id, addr);
} }
channel = new CentrifugeLiveChannel(id, scopeId, namespace, path); channel = new CentrifugeLiveChannel(id, addr);
channel.shutdownCallback = () => { channel.shutdownCallback = () => {
this.open.delete(id); // remove it from the list of open channels this.open.delete(id); // remove it from the list of open channels
}; };
...@@ -117,13 +113,14 @@ export class CentrifugeSrv implements GrafanaLiveSrv { ...@@ -117,13 +113,14 @@ export class CentrifugeSrv implements GrafanaLiveSrv {
} }
private async initChannel(scope: GrafanaLiveScope, channel: CentrifugeLiveChannel): Promise<void> { private async initChannel(scope: GrafanaLiveScope, channel: CentrifugeLiveChannel): Promise<void> {
const support = await scope.getChannelSupport(channel.namespace); const { addr } = channel;
const support = await scope.getChannelSupport(addr.namespace);
if (!support) { if (!support) {
throw new Error(channel.namespace + 'does not support streaming'); throw new Error(channel.addr.namespace + 'does not support streaming');
} }
const config = support.getChannelConfig(channel.path); const config = support.getChannelConfig(addr.path);
if (!config) { if (!config) {
throw new Error('unknown path: ' + channel.path); throw new Error('unknown path: ' + addr.path);
} }
if (config.canPublish?.()) { if (config.canPublish?.()) {
channel.publish = (data: any) => this.centrifuge.publish(channel.id, data); channel.publish = (data: any) => this.centrifuge.publish(channel.id, data);
......
...@@ -21,7 +21,7 @@ const samples: Array<SelectableValue<string>> = [ ...@@ -21,7 +21,7 @@ const samples: Array<SelectableValue<string>> = [
{ label: 'Show buckets', description: 'List the avaliable buckets (table)', value: 'buckets()' }, { label: 'Show buckets', description: 'List the avaliable buckets (table)', value: 'buckets()' },
{ {
label: 'Simple query', label: 'Simple query',
description: 'filter by measurment and field', description: 'filter by measurement and field',
value: `from(bucket: "db/rp") value: `from(bucket: "db/rp")
|> range(start: v.timeRangeStart, stop:v.timeRangeStop) |> range(start: v.timeRangeStart, stop:v.timeRangeStop)
|> filter(fn: (r) => |> filter(fn: (r) =>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment