Commit f6206ec0 by Chavee Issariyapat

redesign structure of subscribed topics

parent 2159a50c
...@@ -16,22 +16,20 @@ const Coordinator = function(param={}) { ...@@ -16,22 +16,20 @@ const Coordinator = function(param={}) {
this.flowemitter.on('newclient', function(client){ this.flowemitter.on('newclient', function(client){
let deviceid = client.deviceid; let deviceid = client.deviceid;
let devicetoken = client.devicetoken; let devicetoken = client.devicetoken;
if (that.flowemitter.sublist[deviceid] == undefined || that.flowemitter.sublist[deviceid] == 0) {
that.flowemitter.sublist[deviceid] = { let nodeList = that.flowemitter.getDeviceList();
count : 1,
token : devicetoken if (nodeList[client.nodeid] && !nodeList[client.nodeid].subbed ) {
} nodeList[client.nodeid].subbed = true;
that.remoteclient.subscribe(`@tap/shadow/updated/${deviceid}:${client.token}`); let deviceid = nodeList[client.nodeid].deviceid;
that.remoteclient.subscribe(`@tap/device/changed/${deviceid}:${client.token}`); let devicetoken = nodeList[client.nodeid].devicetoken;
that.remoteclient.publish(`@tap/device/get/${deviceid}:${client.token}`); that.remoteclient.subscribe(`@tap/shadow/updated/${deviceid}:${devicetoken}`);
that.remoteclient.publish(`@tap/shadow/get/${deviceid}:${client.token}`); that.remoteclient.subscribe(`@tap/device/changed/${deviceid}:${devicetoken}`);
}
else{ that.remoteclient.publish(`@tap/device/get/${deviceid}:${devicetoken}`);
that.flowemitter.sublist[deviceid].count++; that.remoteclient.publish(`@tap/shadow/get/${deviceid}:${devicetoken}`);
} }
if (DEBUG_SUBLIST) { if (DEBUG_SUBLIST) {
...@@ -42,27 +40,28 @@ const Coordinator = function(param={}) { ...@@ -42,27 +40,28 @@ const Coordinator = function(param={}) {
this.flowemitter.on('flowpub', function(topic, payload, client){ this.flowemitter.on('flowpub', function(topic, payload, client){
let outtopic, outmsg; let outtopic, outmsg;
let deviceid = client.id; let deviceid = client.deviceid;
let devicetoken = client.devicetoken;
switch (topic) { switch (topic) {
case '@shadow/data/update' : case '@shadow/data/update' :
outtopic = `@tap/shadow/update/${deviceid}:${client.token}`; outtopic = `@tap/shadow/update/${deviceid}:${devicetoken}`;
outmsg = payload.toString(); outmsg = payload.toString();
that.remoteclient.publish(outtopic, outmsg); that.remoteclient.publish(outtopic, outmsg);
break; break;
case '@local/shadow/get' : case '@local/shadow/get' :
outtopic = `@tap/shadow/get/${deviceid}:${client.token}`; outtopic = `@tap/shadow/get/${deviceid}:${devicetoken}`;
outmsg = payload.toString(); outmsg = payload.toString();
that.remoteclient.publish(outtopic, outmsg); that.remoteclient.publish(outtopic, outmsg);
break; break;
case '@local/device/get' : case '@local/device/get' :
outtopic = `@tap/device/get/${deviceid}:${client.token}`; outtopic = `@tap/device/get/${deviceid}:${devicetoken}`;
outmsg = payload.toString(); outmsg = payload.toString();
that.remoteclient.publish(outtopic, outmsg); that.remoteclient.publish(outtopic, outmsg);
break; break;
default: default:
if (topic.startsWith('@local/msgout/')) { if (topic.startsWith('@local/msgout/')) {
let part = topic.substr(14); let part = topic.substr(14);
outtopic = `@tap/msg/topic/${deviceid}:${client.token}/${part}`; outtopic = `@tap/msg/topic/${deviceid}:${devicetoken}/${part}`;
outmsg = payload.toString(); outmsg = payload.toString();
that.remoteclient.publish(outtopic, outmsg); that.remoteclient.publish(outtopic, outmsg);
} }
...@@ -72,11 +71,12 @@ const Coordinator = function(param={}) { ...@@ -72,11 +71,12 @@ const Coordinator = function(param={}) {
this.flowemitter.on('flowsub', function(topic, client) { this.flowemitter.on('flowsub', function(topic, client) {
let deviceid = client.id; let deviceid = client.deviceid;
let devicetoken = client.devicetoken;
// if client try to subscribe @msg/xxx/yyy --> flowagent subscribe @tap on a remote broker // if client try to subscribe @msg/xxx/yyy --> flowagent subscribe @tap on a remote broker
if (topic.startsWith('@local/msgin/')) { if (topic.startsWith('@local/msgin/')) {
let msgpart = topic.split('/').splice(4).join('/'); let msgpart = topic.split('/').splice(4).join('/');
outtopic = `@tap/msg/topic/${deviceid}:${client.token}/${msgpart}`; outtopic = `@tap/msg/topic/${deviceid}:${devicetoken}/${msgpart}`;
that.remoteclient.subscribe(outtopic); that.remoteclient.subscribe(outtopic);
} }
else { else {
...@@ -84,19 +84,24 @@ const Coordinator = function(param={}) { ...@@ -84,19 +84,24 @@ const Coordinator = function(param={}) {
}); });
this.remoteclient.on('connect', function() { this.remoteclient.on('connect', function() {
console.log(' [info] Connected to NEXPIE message broker.') console.log(' [info] Connected to MQTT broker.')
that.remoteclient.subscribe('@private/#'); that.remoteclient.subscribe('@private/#');
that.emit('mqttconnected');
// setTimeout(function() {
// for (let deviceid in that.flowemitter.sublist) {
// that.remoteclient.subscribe(`@tap/shadow/updated/${deviceid}:${that.flowemitter.sublist[deviceid].token}`);
// that.remoteclient.subscribe(`@tap/device/changed/${deviceid}:${that.flowemitter.sublist[deviceid].token}`);
// that.remoteclient.publish(`@tap/device/get/${deviceid}:${that.flowemitter.sublist[deviceid].token}`);
// that.remoteclient.publish(`@tap/shadow/get/${deviceid}:${that.flowemitter.sublist[deviceid].token}`);
// }
// }, 1000);
});
setTimeout(function() { this.remoteclient.on('close', function() {
for (let deviceid in that.flowemitter.sublist) { console.log(' [info] Disconnect from MQTT broker.')
that.emit('mqttdisconnected');
that.remoteclient.subscribe(`@tap/shadow/updated/${deviceid}:${that.flowemitter.sublist[deviceid].token}`);
that.remoteclient.subscribe(`@tap/device/changed/${deviceid}:${that.flowemitter.sublist[deviceid].token}`);
that.remoteclient.publish(`@tap/device/get/${deviceid}:${that.flowemitter.sublist[deviceid].token}`);
that.remoteclient.publish(`@tap/shadow/get/${deviceid}:${that.flowemitter.sublist[deviceid].token}`);
}
}, 1000);
}); });
this.remoteclient.on('message', function(topic, payload){ this.remoteclient.on('message', function(topic, payload){
......
...@@ -5,38 +5,44 @@ const events = require('events'); ...@@ -5,38 +5,44 @@ const events = require('events');
const FlowEmitter = function(param = {}) { const FlowEmitter = function(param = {}) {
this.red = param.red; // node-red object this.red = param.red; // node-red object
this.devicelist = {};
this.sublist = {}; this.sublist = {};
this.globalcontext = param.globalcontext; this.globalcontext = param.globalcontext;
this.events = new events.EventEmitter; this.events = new events.EventEmitter;
} }
FlowEmitter.prototype.start = function() { FlowEmitter.prototype.start = function() {
let that = this; let that = this;
this.red.events.on('init', function(client) { this.red.events.on('register', function(client) {
that.emit('newclient', {
deviceid : client.deviceid, that.devicelist[client.nodeid] = {
devicetoken : client.devicetoken deviceid : client.deviceid,
}); devicetoken: client.devicetoken,
msgtopic : []
}
that.emit('newclient', client);
}); });
this.red.events.on('deregister', function(client) {
delete that.devicelist[client.nodeid];
});
this.red.events.on('flowsub', function(topic, client) { this.red.events.on('flowsub', function(topic, client) {
that.emit('flowsub', topic, { that.emit('flowsub', topic, client);
id : client.deviceid,
token : client.devicetoken
});
}); });
this.red.events.on('flowpub', function(topic, payload, client) { this.red.events.on('flowpub', function(topic, payload, client) {
that.emit('flowpub', topic, payload, { that.emit('flowpub', topic, payload, client);
id : client.deviceid,
token : client.devicetoken
});
}); });
} }
FlowEmitter.prototype.getDeviceList = function() {
return this.devicelist;
}
FlowEmitter.prototype.pub = function(topic, payload) { FlowEmitter.prototype.pub = function(topic, payload) {
this.red.events.emit(topic, payload); this.red.events.emit(topic, payload);
} }
...@@ -54,6 +60,10 @@ FlowEmitter.prototype.emit = function(eventname, data1, data2, data3) { ...@@ -54,6 +60,10 @@ FlowEmitter.prototype.emit = function(eventname, data1, data2, data3) {
this.events.emit(eventname, data1, data2, data3); this.events.emit(eventname, data1, data2, data3);
} }
FlowEmitter.prototype.redemit = function(eventname, data1, data2, data3) {
this.red.events.emit(eventname, data1, data2, data3);
}
function create(param) { function create(param) {
return new FlowEmitter(param); return new FlowEmitter(param);
} }
...@@ -23,6 +23,10 @@ MQTTClient.prototype.connect = function() { ...@@ -23,6 +23,10 @@ MQTTClient.prototype.connect = function() {
that.emit('disconnect'); that.emit('disconnect');
}); });
this.client.on('close', function() {
that.emit('close');
});
this.client.on('message', async function(topic, payload) { this.client.on('message', async function(topic, payload) {
that.emit('message', topic, payload); that.emit('message', topic, payload);
}); });
...@@ -36,6 +40,7 @@ MQTTClient.prototype.connect = function() { ...@@ -36,6 +40,7 @@ MQTTClient.prototype.connect = function() {
MQTTClient.prototype.publish = function(topic, payload) { MQTTClient.prototype.publish = function(topic, payload) {
if (this.client) { if (this.client) {
console.log(' mqtt pub --> '+topic);
this.client.publish(topic, payload); this.client.publish(topic, payload);
} }
else { else {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment