Commit 4d893520 by Chavee Issariyapat

redesign subscription flow

parent f6206ec0
...@@ -5,32 +5,65 @@ const DEBUG_SUBLIST = false; ...@@ -5,32 +5,65 @@ const DEBUG_SUBLIST = false;
const events = require('events'); const events = require('events');
const cache = require('./cache'); const cache = require('./cache');
const RESETDELAY = 1000
const Coordinator = function(param={}) { const Coordinator = function(param={}) {
let that = this; let that = this;
let resettimer = 0;
this.events = new events.EventEmitter; this.events = new events.EventEmitter;
this.remoteclient = param.remoteclient; this.remoteclient = param.remoteclient;
this.flowemitter = param.flowemitter || null; this.flowemitter = param.flowemitter || null;
// function resetSubscription() {
// }
// function checkupSubscription() {
// let nodeList = that.flowemitter.getDeviceList();
// let sublist = {};
// let topic = '';
// function safeSub(topic) {
// if (topic && !sublist[topic]) {
// that.remoteclient.subscribe(topic);
// sublist[topic] = true;
// }
// }
// for (let nodeid in nodeList) {
// if (!nodeList[nodeid].subbbed) {
// safeSub(`@tap/shadow/updated/${deviceid}:${devicetoken}`);
// safeSub(`@tap/device/changed/${deviceid}:${devicetoken}`);
// let tp;
// for (let i=0; i<nodeList[nodeid].msgtopic.length; i++) {
// safeSub(nodeList[nodeid].msgtopic[i]);
// }
// nodeList[nodeid].subbbed = true;
// }
// }
// }
this.flowemitter.on('newclient', function(client){ this.flowemitter.on('newclient', function(client){
let deviceid = client.deviceid; let deviceid = client.deviceid;
let devicetoken = client.devicetoken; let devicetoken = client.devicetoken;
let nodeList = that.flowemitter.getDeviceList(); //let nodeList = that.flowemitter.getDeviceList();
// if (nodeList[client.nodeid] && !nodeList[client.nodeid].subbed ) {
if (nodeList[client.nodeid] && !nodeList[client.nodeid].subbed ) { // nodeList[client.nodeid].subbed = true;
nodeList[client.nodeid].subbed = true; // let deviceid = nodeList[client.nodeid].deviceid;
// let devicetoken = nodeList[client.nodeid].devicetoken;
let deviceid = nodeList[client.nodeid].deviceid;
let devicetoken = nodeList[client.nodeid].devicetoken;
that.remoteclient.subscribe(`@tap/shadow/updated/${deviceid}:${devicetoken}`); that.remoteclient.subscribe(`@tap/shadow/updated/${deviceid}:${devicetoken}`);
that.remoteclient.subscribe(`@tap/device/changed/${deviceid}:${devicetoken}`); that.remoteclient.subscribe(`@tap/device/changed/${deviceid}:${devicetoken}`);
that.remoteclient.publish(`@tap/device/get/${deviceid}:${devicetoken}`); // that.remoteclient.publish(`@tap/device/get/${deviceid}:${devicetoken}`);
that.remoteclient.publish(`@tap/shadow/get/${deviceid}:${devicetoken}`); // that.remoteclient.publish(`@tap/shadow/get/${deviceid}:${devicetoken}`);
}
//}
if (DEBUG_SUBLIST) { if (DEBUG_SUBLIST) {
console.log(`\nshadow client #${deviceid} connected -->`) console.log(`\nshadow client #${deviceid} connected -->`)
...@@ -65,7 +98,6 @@ const Coordinator = function(param={}) { ...@@ -65,7 +98,6 @@ const Coordinator = function(param={}) {
outmsg = payload.toString(); outmsg = payload.toString();
that.remoteclient.publish(outtopic, outmsg); that.remoteclient.publish(outtopic, outmsg);
} }
} }
}); });
...@@ -73,20 +105,29 @@ const Coordinator = function(param={}) { ...@@ -73,20 +105,29 @@ const Coordinator = function(param={}) {
this.flowemitter.on('flowsub', function(topic, client) { this.flowemitter.on('flowsub', function(topic, client) {
let deviceid = client.deviceid; let deviceid = client.deviceid;
let devicetoken = client.devicetoken; let devicetoken = client.devicetoken;
// if client try to subscribe @msg/xxx/yyy --> flowagent subscribe @tap on a remote broker
if (topic.startsWith('@local/msgin/')) { if (topic.startsWith('@local/msgin/')) {
let msgpart = topic.split('/').splice(4).join('/'); let msgpart = topic.split('/').splice(4).join('/');
outtopic = `@tap/msg/topic/${deviceid}:${devicetoken}/${msgpart}`; outtopic = `@tap/msg/topic/${deviceid}:${devicetoken}/${msgpart}`;
that.remoteclient.subscribe(outtopic); that.remoteclient.subscribe(outtopic);
} }
else { else {
} }
}); });
this.flowemitter.on('startcountdownreset', function() {
if (!resettimer) {
resettimer = setTimeout(function() {
console.log(' [info] MQTT resetting...');
that.remoteclient.resetbroker();
}, RESETDELAY);
}
});
this.remoteclient.on('connect', function() { this.remoteclient.on('connect', function() {
console.log(' [info] Connected to MQTT broker.') console.log(' [info] Connected to MQTT broker.')
that.remoteclient.subscribe('@private/#'); that.remoteclient.subscribe('@private/#');
that.emit('mqttconnected');
// setTimeout(function() { // setTimeout(function() {
// for (let deviceid in that.flowemitter.sublist) { // for (let deviceid in that.flowemitter.sublist) {
...@@ -101,7 +142,6 @@ const Coordinator = function(param={}) { ...@@ -101,7 +142,6 @@ const Coordinator = function(param={}) {
this.remoteclient.on('close', function() { this.remoteclient.on('close', function() {
console.log(' [info] Disconnect from MQTT broker.') console.log(' [info] Disconnect from MQTT broker.')
that.emit('mqttdisconnected');
}); });
this.remoteclient.on('message', function(topic, payload){ this.remoteclient.on('message', function(topic, payload){
...@@ -136,7 +176,7 @@ const Coordinator = function(param={}) { ...@@ -136,7 +176,7 @@ const Coordinator = function(param={}) {
cache.setStatus(jsonpayload.deviceid, jsonpayload); cache.setStatus(jsonpayload.deviceid, jsonpayload);
that.flowemitter.pub(`flow:device:${jsonpayload.deviceid}`, { that.flowemitter.pub(`flow:device:${jsonpayload.deviceid}`, {
deviceget : jsonpayload deviceget : jsonpayload
}); })
} }
else if (topic.startsWith('@msg/')) { else if (topic.startsWith('@msg/')) {
that.flowemitter.pub(`flow:msg`, { that.flowemitter.pub(`flow:msg`, {
......
...@@ -20,6 +20,10 @@ let FlowAgent = function(option = {}) { ...@@ -20,6 +20,10 @@ let FlowAgent = function(option = {}) {
red : option.red || null red : option.red || null
} }
if (this.option.red) {
this.option.red.events.setMaxListeners(100);
}
this.remoteclient = MQTTClient.create({ this.remoteclient = MQTTClient.create({
host: that.option.broker_uri, host: that.option.broker_uri,
options: { options: {
......
...@@ -5,8 +5,8 @@ const events = require('events'); ...@@ -5,8 +5,8 @@ const events = require('events');
const FlowEmitter = function(param = {}) { const FlowEmitter = function(param = {}) {
this.red = param.red; // node-red object this.red = param.red; // node-red object
this.devicelist = {}; //this.devicelist = {};
this.sublist = {}; //this.sublist = {};
this.globalcontext = param.globalcontext; this.globalcontext = param.globalcontext;
this.events = new events.EventEmitter; this.events = new events.EventEmitter;
...@@ -17,19 +17,22 @@ FlowEmitter.prototype.start = function() { ...@@ -17,19 +17,22 @@ FlowEmitter.prototype.start = function() {
this.red.events.on('register', function(client) { this.red.events.on('register', function(client) {
that.devicelist[client.nodeid] = { // that.devicelist[client.nodeid] = {
deviceid : client.deviceid, // deviceid : client.deviceid,
devicetoken: client.devicetoken, // devicetoken: client.devicetoken,
msgtopic : [] // msgtopic : [],
} // subbed : false
// }
that.emit('newclient', client); that.emit('newclient', client);
}); });
this.red.events.on('deregister', function(client) { this.red.events.on('deregister', function(client) {
delete that.devicelist[client.nodeid]; //delete that.devicelist[client.nodeid];
that.emit('startcountdownreset');
}); });
this.red.events.on('flowsub', function(topic, client) { this.red.events.on('flowsub', function(topic, client) {
//that.devicelist[client.nodeid].msgtopic.push(topic);
that.emit('flowsub', topic, client); that.emit('flowsub', topic, client);
}); });
...@@ -39,9 +42,16 @@ FlowEmitter.prototype.start = function() { ...@@ -39,9 +42,16 @@ FlowEmitter.prototype.start = function() {
} }
FlowEmitter.prototype.getDeviceList = function() { // FlowEmitter.prototype.getDeviceList = function() {
return this.devicelist; // return this.devicelist;
} // }
// FlowEmitter.prototype.clearDeviceSubscriptionState = function() {
// for (let nodeid in this.devicelist) {
// this.devicelist[nodeid].subbed = false;
// }
// }
FlowEmitter.prototype.pub = function(topic, payload) { FlowEmitter.prototype.pub = function(topic, payload) {
this.red.events.emit(topic, payload); this.red.events.emit(topic, payload);
......
...@@ -9,13 +9,22 @@ const MQTTClient = function(param = {}) { ...@@ -9,13 +9,22 @@ const MQTTClient = function(param = {}) {
this.options = param.options; this.options = param.options;
} }
function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
MQTTClient.prototype = new events.EventEmitter; MQTTClient.prototype = new events.EventEmitter;
MQTTClient.prototype.connect = function() { MQTTClient.prototype.connect = function() {
var that = this; var that = this;
this.subscription = [];
this.client = MQTT.connect(this.host, this.options || {}); this.client = MQTT.connect(this.host, this.options || {});
this.client.on('connect', function() { this.client.on('connect', async function() {
for (let topic in that.subscription) {
await sleep(100);
}
that.emit('connect'); that.emit('connect');
}); });
...@@ -38,9 +47,19 @@ MQTTClient.prototype.connect = function() { ...@@ -38,9 +47,19 @@ MQTTClient.prototype.connect = function() {
} }
MQTTClient.prototype.resetbroker = function(topic, payload) {
this.client.end();
//this.subscription = {};
this.client.reconnect();
}
MQTTClient.prototype.publish = function(topic, payload) { MQTTClient.prototype.publish = function(topic, payload) {
if (this.client) { if (this.client) {
console.log(' mqtt pub --> '+topic); //console.log(' mqtt pub --> '+topic);
if (this.subscription[topic]) this.subscription[topic]++;
else this.subscription[topic] = 1;
this.client.publish(topic, payload); this.client.publish(topic, payload);
} }
else { else {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment