Commit 73a790d8 by Chavee Issariyapat

refactor flow

parent beb2058d
module.exports.create = create module.exports.create = create
const DEBUG_SUBLIST = false; const DEBUG = false;
const events = require('events'); const events = require('events');
const cache = require('./cache'); const cache = require('./cache');
...@@ -22,30 +22,47 @@ const Coordinator = function(param={}) { ...@@ -22,30 +22,47 @@ const Coordinator = function(param={}) {
this.mqttclient = param.mqttclient; this.mqttclient = param.mqttclient;
this.flowemitter = param.flowemitter || null; this.flowemitter = param.flowemitter || null;
this.flowemitter.on('newshadowclient', function(client){ this.flowemitter.on('registershadowlistener', function(client){
let deviceid = client.deviceid; let deviceid = client.deviceid;
let devicetoken = client.devicetoken; let devicetoken = client.devicetoken;
console.log let outtopic1 = `@tap/shadow/updated/${deviceid}:${devicetoken}`;
if (that.topicstore.addTopic(outtopic1, client.nodeid)) {
if (DEBUG)console.log('----- subscribe : '+outtopic1)
that.mqttclient.subscribe(outtopic1);
}
that.mqttclient.subscribe(`@tap/shadow/updated/${deviceid}:${devicetoken}`); let outtopic2 = `@tap/device/changed/${deviceid}:${devicetoken}`;
that.mqttclient.subscribe(`@tap/device/changed/${deviceid}:${devicetoken}`); if (that.topicstore.addTopic(outtopic2, client.nodeid)) {
if (DEBUG) console.log('----- subscribe : '+outtopic2)
that.mqttclient.subscribe(outtopic2);
}
if (DEBUG_SUBLIST) { if (DEBUG) {
// console.log(`\nshadow client #${deviceid} connected -->`) console.log('registershadowlistener ---------------------------');
// console.log(that.flowemitter.sublist); console.log(that.topicstore.getRawData());
} }
}); });
this.flowemitter.on('newmsgclient', function(client){ this.flowemitter.on('deregistershadowlistener', function(client){
let deviceid = client.deviceid; let deviceid = client.deviceid;
let devicetoken = client.devicetoken; let devicetoken = client.devicetoken;
that.mqttclient.subscribe(`@tap/device/changed/${deviceid}:${devicetoken}`); let outtopic1 = `@tap/shadow/updated/${deviceid}:${devicetoken}`;
if (that.topicstore.delTopic(outtopic1, client.nodeid)) {
if (DEBUG)console.log('----- unsubscribe : '+outtopic1)
that.mqttclient.unsubscribe(outtopic1);
}
let outtopic2 = `@tap/device/changed/${deviceid}:${devicetoken}`;
if (that.topicstore.delTopic(outtopic2, client.nodeid)) {
if (DEBUG)console.log('----- unsubscribe : '+outtopic2)
that.mqttclient.unsubscribe(outtopic2);
}
if (DEBUG_SUBLIST) { if (DEBUG) {
// console.log(`\nshadow client #${deviceid} connected -->`) console.log('deregistershadowlistener ---------------------------');
// console.log(that.flowemitter.sublist); console.log(that.topicstore.getRawData());
} }
}); });
...@@ -80,17 +97,16 @@ console.log ...@@ -80,17 +97,16 @@ console.log
} }
}); });
this.flowemitter.on('flowsub', function(topic, client) { this.flowemitter.on('flowsub', function(topic, client) {
console.log(client.nodeid + ' flowsub --> '+topic) if (DEBUG) console.log(client.nodeid + ' flowsub --> '+topic)
let deviceid = client.deviceid; let deviceid = client.deviceid;
let devicetoken = client.devicetoken; let devicetoken = client.devicetoken;
if (topic.startsWith('@local/msgin/')) { if (topic.startsWith('@local/msgin/')) {
let msgpart = topic.split('/').splice(4).join('/'); let msgpart = topic.split('/').splice(4).join('/');
let touttopic = `@tap/msg/topic/${deviceid}:${devicetoken}/${msgpart}`;
outtopic = `@tap/msg/topic/${deviceid}:${devicetoken}/${msgpart}`;
if (that.topicstore.addTopic(outtopic, client.nodeid)) { if (that.topicstore.addTopic(outtopic, client.nodeid)) {
console.log('----- subscribe : '+outtopic)
that.mqttclient.subscribe(outtopic); that.mqttclient.subscribe(outtopic);
} }
} }
...@@ -99,7 +115,7 @@ console.log ...@@ -99,7 +115,7 @@ console.log
}); });
this.flowemitter.on('flowunsub', function(topic, client) { this.flowemitter.on('flowunsub', function(topic, client) {
console.log(client.nodeid + ' flowunsub --> '+topic) if (DEBUG) console.log(client.nodeid + ' flowunsub --> '+topic)
let deviceid = client.deviceid; let deviceid = client.deviceid;
let devicetoken = client.devicetoken; let devicetoken = client.devicetoken;
...@@ -107,7 +123,7 @@ console.log ...@@ -107,7 +123,7 @@ console.log
let msgpart = topic.split('/').splice(4).join('/'); let msgpart = topic.split('/').splice(4).join('/');
outtopic = `@tap/msg/topic/${deviceid}:${devicetoken}/${msgpart}`; outtopic = `@tap/msg/topic/${deviceid}:${devicetoken}/${msgpart}`;
if (that.topicstore.delTopic(outtopic, client.nodeid)) { if (that.topicstore.delTopic(outtopic, client.nodeid)) {
//console.log('----- unsubscribe : '+outtopic) console.log('----- unsubscribe : '+outtopic)
that.mqttclient.unsubscribe(outtopic); that.mqttclient.unsubscribe(outtopic);
} }
} }
...@@ -194,7 +210,6 @@ Coordinator.prototype.start = function() { ...@@ -194,7 +210,6 @@ Coordinator.prototype.start = function() {
else { else {
return false; return false;
} }
} }
function create(param) { function create(param) {
......
...@@ -13,11 +13,12 @@ FlowEmitter.prototype.start = function() { ...@@ -13,11 +13,12 @@ FlowEmitter.prototype.start = function() {
let that = this; let that = this;
this.red.events.on(`${that.namespace}-registershadowlistener`, function(client) { this.red.events.on(`${that.namespace}-registershadowlistener`, function(client) {
that.emit('newshadowclient', client); that.emit('registershadowlistener', client);
}); });
this.red.events.on(`${that.namespace}-deregistershadowlistener`, function(client) { this.red.events.on(`${that.namespace}-deregistershadowlistener`, function(client) {
that.emit('startcountdownreset'); that.emit('deregistershadowlistener', client);
//that.emit('startcountdownreset');
}); });
this.red.events.on(`${that.namespace}-flowsub`, function(topic, client) { this.red.events.on(`${that.namespace}-flowsub`, function(topic, client) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment