Commit 2448426a by Chavee Issariyapat

fix duplicate listeners on mqttclient.on('connect')

parent 7a965d29
...@@ -7,22 +7,25 @@ const cache = require('./cache'); ...@@ -7,22 +7,25 @@ const cache = require('./cache');
const RESETDELAY = 1000 const RESETDELAY = 1000
const Coordinator = function(param={}) { const Coordinator = function(param={}) {
let that = this; let that = this;
let resettimer = 0; let resettimer = 0;
this.x = 0;
this.started = false; this.started = false;
this.events = new events.EventEmitter; this.events = new events.EventEmitter;
this.remoteclient = param.remoteclient; this.mqttclient = param.mqttclient;
this.flowemitter = param.flowemitter || null; this.flowemitter = param.flowemitter || null;
this.flowemitter.on('newclient', function(client){ this.flowemitter.on('newclient', function(client){
let deviceid = client.deviceid; let deviceid = client.deviceid;
let devicetoken = client.devicetoken; let devicetoken = client.devicetoken;
that.remoteclient.subscribe(`@tap/shadow/updated/${deviceid}:${devicetoken}`); that.mqttclient.subscribe(`@tap/shadow/updated/${deviceid}:${devicetoken}`);
that.remoteclient.subscribe(`@tap/device/changed/${deviceid}:${devicetoken}`); that.mqttclient.subscribe(`@tap/device/changed/${deviceid}:${devicetoken}`);
if (DEBUG_SUBLIST) { if (DEBUG_SUBLIST) {
console.log(`\nshadow client #${deviceid} connected -->`) console.log(`\nshadow client #${deviceid} connected -->`)
...@@ -38,24 +41,24 @@ const Coordinator = function(param={}) { ...@@ -38,24 +41,24 @@ const Coordinator = function(param={}) {
case '@shadow/data/update' : case '@shadow/data/update' :
outtopic = `@tap/shadow/update/${deviceid}:${devicetoken}`; outtopic = `@tap/shadow/update/${deviceid}:${devicetoken}`;
outmsg = payload.toString(); outmsg = payload.toString();
that.remoteclient.publish(outtopic, outmsg); that.mqttclient.publish(outtopic, outmsg);
break; break;
case '@local/shadow/get' : case '@local/shadow/get' :
outtopic = `@tap/shadow/get/${deviceid}:${devicetoken}`; outtopic = `@tap/shadow/get/${deviceid}:${devicetoken}`;
outmsg = payload.toString(); outmsg = payload.toString();
that.remoteclient.publish(outtopic, outmsg); that.mqttclient.publish(outtopic, outmsg);
break; break;
case '@local/device/get' : case '@local/device/get' :
outtopic = `@tap/device/get/${deviceid}:${devicetoken}`; outtopic = `@tap/device/get/${deviceid}:${devicetoken}`;
outmsg = payload.toString(); outmsg = payload.toString();
that.remoteclient.publish(outtopic, outmsg); that.mqttclient.publish(outtopic, outmsg);
break; break;
default: default:
if (topic.startsWith('@local/msgout/')) { if (topic.startsWith('@local/msgout/')) {
let part = topic.substr(14); let part = topic.substr(14);
outtopic = `@tap/msg/topic/${deviceid}:${devicetoken}/${part}`; outtopic = `@tap/msg/topic/${deviceid}:${devicetoken}/${part}`;
outmsg = payload.toString(); outmsg = payload.toString();
that.remoteclient.publish(outtopic, outmsg); that.mqttclient.publish(outtopic, outmsg);
} }
} }
}); });
...@@ -68,7 +71,7 @@ const Coordinator = function(param={}) { ...@@ -68,7 +71,7 @@ const Coordinator = function(param={}) {
let msgpart = topic.split('/').splice(4).join('/'); let msgpart = topic.split('/').splice(4).join('/');
outtopic = `@tap/msg/topic/${deviceid}:${devicetoken}/${msgpart}`; outtopic = `@tap/msg/topic/${deviceid}:${devicetoken}/${msgpart}`;
that.remoteclient.subscribe(outtopic); that.mqttclient.subscribe(outtopic);
} }
else { else {
...@@ -79,22 +82,21 @@ const Coordinator = function(param={}) { ...@@ -79,22 +82,21 @@ const Coordinator = function(param={}) {
if (!resettimer) { if (!resettimer) {
resettimer = setTimeout(function() { resettimer = setTimeout(function() {
console.log(' [info] MQTT resetting...'); console.log(' [info] MQTT resetting...');
that.remoteclient.resetbroker(); that.mqttclient.resetbroker();
}, RESETDELAY); }, RESETDELAY);
} }
}); });
this.remoteclient.on('connect', function() { this.mqttclient.on('connect', function() {
console.log(' [info] Connected to MQTT broker.') that.mqttclient.subscribe('@private/#');
that.remoteclient.subscribe('@private/#');
}); });
this.remoteclient.on('close', function() { this.mqttclient.on('close', function() {
this.started = false; this.started = false;
console.log(' [info] Disconnect from MQTT broker.') console.log(' [info] Disconnect from MQTT broker.')
}); });
this.remoteclient.on('message', function(topic, payload){ this.mqttclient.on('message', function(topic, payload){
let jsonpayload = {}; let jsonpayload = {};
try { try {
jsonpayload = JSON.parse(payload.toString()); jsonpayload = JSON.parse(payload.toString());
...@@ -148,7 +150,7 @@ Coordinator.prototype.emit = function(eventname, data1, data2, data3) { ...@@ -148,7 +150,7 @@ Coordinator.prototype.emit = function(eventname, data1, data2, data3) {
Coordinator.prototype.start = function() { Coordinator.prototype.start = function() {
if (!this.started) { if (!this.started) {
this.started = true; this.started = true;
this.remoteclient.connect(); this.mqttclient.connect();
this.flowemitter.start(); this.flowemitter.start();
return true; return true;
} }
......
...@@ -3,16 +3,16 @@ module.exports.create = create ...@@ -3,16 +3,16 @@ module.exports.create = create
const MQTTClient = require('./mqttclient'); const MQTTClient = require('./mqttclient');
const MQTTServer = require('./mqttserver'); const MQTTServer = require('./mqttserver');
const FlowEmitter = require('./flowemitter'); const FlowEmitter = require('./flowemitter');
const Coordinatior = require('./coordinator');
const events = require('events'); const events = require('events');
let FlowAgent = function(option = {}) { let FlowAgent = function(option = {}) {
let that = this; let that = this;
const Coordinatior = require('./coordinator'); this.started = false;
this.events = new events.EventEmitter; this.events = new events.EventEmitter;
this.option = { this.option = {
broker_uri : option.broker_uri, broker_uri : option.broker_uri,
flowagentid : option.flowagentid, flowagentid : option.flowagentid,
...@@ -32,19 +32,19 @@ let FlowAgent = function(option = {}) { ...@@ -32,19 +32,19 @@ let FlowAgent = function(option = {}) {
keepalive: 30 keepalive: 30
} }
this.remoteclient = MQTTClient.create({ let mqttclient = MQTTClient.create({
host: that.option.broker_uri, host: that.option.broker_uri,
options: opt options: opt
}); });
this.flowemitter = FlowEmitter.create({ let flowemitter = FlowEmitter.create({
red: this.option.red, red: this.option.red,
namespace: this.option.namespace namespace: this.option.namespace
}); });
this.coordinator = Coordinatior.create({ this.coordinator = Coordinatior.create({
remoteclient: that.remoteclient, mqttclient: mqttclient,
flowemitter: that.flowemitter flowemitter: flowemitter
}); });
} }
...@@ -57,7 +57,8 @@ FlowAgent.prototype.emit = function(eventname, data1, data2, data3) { ...@@ -57,7 +57,8 @@ FlowAgent.prototype.emit = function(eventname, data1, data2, data3) {
} }
FlowAgent.prototype.start = function() { FlowAgent.prototype.start = function() {
if (this.coordinator.start()) { if (!this.started) {
this.started = this.coordinator.start();
console.log('Staring FlowAgent...') console.log('Staring FlowAgent...')
} }
} }
......
module.exports.create = create module.exports.create = create
const MQTT = require('mqtt'); const MQTT = require('mqtt');
const events = require('events'); const EventEmitter = require('events');
const MQTTClient = function(param = {}) { const MQTTClient = function(param = {}) {
this.client = null; this.client = null;
...@@ -12,19 +12,24 @@ const MQTTClient = function(param = {}) { ...@@ -12,19 +12,24 @@ const MQTTClient = function(param = {}) {
function sleep(ms) { function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms)); return new Promise(resolve => setTimeout(resolve, ms));
} }
MQTTClient.prototype = new events.EventEmitter;
//----
//const events = require('events');
//MQTTClient.prototype = new events.EventEmitter;
//MQTTClient.prototype = new EventEmitter();
//----
MQTTClient.prototype.__proto__ = EventEmitter.prototype;
MQTTClient.prototype.connect = function() { MQTTClient.prototype.connect = function() {
var that = this; var that = this;
this.subscription = []; if (this.client) delete this.client;
this.subscription = [];
this.client = MQTT.connect(this.host, this.options || {}); this.client = MQTT.connect(this.host, this.options || {});
this.client.on('connect', function() { this.client.on('connect', function() {
// for (let topic in that.subscription) {
// await sleep(100);
// }
that.emit('connect'); that.emit('connect');
}); });
...@@ -61,11 +66,9 @@ MQTTClient.prototype.publish = function(topic, payload) { ...@@ -61,11 +66,9 @@ MQTTClient.prototype.publish = function(topic, payload) {
} }
MQTTClient.prototype.subscribe = function(topic) { MQTTClient.prototype.subscribe = function(topic) {
if (this.client && this.client.connected) { this.client.subscribe(topic, function(err, granted){
this.client.subscribe(topic); //console.log({err, granted})
} });
else {
}
} }
MQTTClient.prototype.unsubscribe = function(topic) { MQTTClient.prototype.unsubscribe = function(topic) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment