Commit 2dbeaa5d by Chavee Issariyapat

rename file

parent 051d5a26
...@@ -8,13 +8,11 @@ const cache = require('./cache'); ...@@ -8,13 +8,11 @@ const cache = require('./cache');
const Coordinator = function(param={}) { const Coordinator = function(param={}) {
let that = this; let that = this;
this.events = new events.EventEmitter;
this.remoteclient = param.remoteclient; this.remoteclient = param.remoteclient;
this.flowemitter = param.flowemitter || null; this.flowemitter = param.flowemitter || null;
this.flowemitter.on('newclient', function(client){ this.flowemitter.on('newclient', function(client){
let deviceid = client.deviceid; let deviceid = client.deviceid;
let devicetoken = client.devicetoken; let devicetoken = client.devicetoken;
...@@ -107,7 +105,6 @@ const Coordinator = function(param={}) { ...@@ -107,7 +105,6 @@ const Coordinator = function(param={}) {
*/ */
this.flowemitter.on('flowSubscribe', function(topic, client) { this.flowemitter.on('flowSubscribe', function(topic, client) {
console.log({topic, client})
let deviceid = client.id.split(':')[0]; let deviceid = client.id.split(':')[0];
// if client try to subscribe @msg/xxx/yyy --> flowagent subscribe @tap on a remote broker // if client try to subscribe @msg/xxx/yyy --> flowagent subscribe @tap on a remote broker
if (topic.startsWith('@local/msgin/')) { if (topic.startsWith('@local/msgin/')) {
...@@ -144,13 +141,10 @@ console.log({topic, client}) ...@@ -144,13 +141,10 @@ console.log({topic, client})
} }
catch(e) {} catch(e) {}
console.log('msg------------------------------')
if (topic.startsWith('@shadow/data/updated')) { if (topic.startsWith('@shadow/data/updated')) {
let newpayload = cache.mergeShadow(jsonpayload.deviceid, jsonpayload); let newpayload = cache.mergeShadow(jsonpayload.deviceid, jsonpayload);
// that.flowemitter.publish(`${jsonpayload.deviceid}/shadow/merged`, JSON.stringify(newpayload) ); // that.flowemitter.publish(`${jsonpayload.deviceid}/shadow/merged`, JSON.stringify(newpayload) );
// that.flowemitter.publish(`${jsonpayload.deviceid}/shadow/updated`, JSON.stringify(jsonpayload) ); // that.flowemitter.publish(`${jsonpayload.deviceid}/shadow/updated`, JSON.stringify(jsonpayload) );
that.flowemitter.publish(`flow:shadow:${jsonpayload.deviceid}`, { that.flowemitter.publish(`flow:shadow:${jsonpayload.deviceid}`, {
shadowupdated : JSON.stringify(jsonpayload), shadowupdated : JSON.stringify(jsonpayload),
shadowmerged : JSON.stringify(newpayload) shadowmerged : JSON.stringify(newpayload)
...@@ -159,7 +153,6 @@ console.log('msg------------------------------') ...@@ -159,7 +153,6 @@ console.log('msg------------------------------')
else if (topic.startsWith('@device/status/changed')){ else if (topic.startsWith('@device/status/changed')){
cache.setStatus(jsonpayload.deviceid, jsonpayload); cache.setStatus(jsonpayload.deviceid, jsonpayload);
//that.flowemitter.publish(`${jsonpayload.deviceid}/device/changed`, JSON.stringify(jsonpayload)); //that.flowemitter.publish(`${jsonpayload.deviceid}/device/changed`, JSON.stringify(jsonpayload));
that.flowemitter.publish(`flow:device:${jsonpayload.deviceid}`, { that.flowemitter.publish(`flow:device:${jsonpayload.deviceid}`, {
devicechanged : JSON.stringify(jsonpayload) devicechanged : JSON.stringify(jsonpayload)
}); });
...@@ -168,7 +161,6 @@ console.log('msg------------------------------') ...@@ -168,7 +161,6 @@ console.log('msg------------------------------')
else if (topic.startsWith('@private/shadow/data/get/response')){ else if (topic.startsWith('@private/shadow/data/get/response')){
cache.setShadow(jsonpayload.deviceid, jsonpayload); cache.setShadow(jsonpayload.deviceid, jsonpayload);
//that.flowemitter.publish(`${jsonpayload.deviceid}/shadow/get`, JSON.stringify(jsonpayload)); //that.flowemitter.publish(`${jsonpayload.deviceid}/shadow/get`, JSON.stringify(jsonpayload));
that.flowemitter.publish(`flow:shadow:${jsonpayload.deviceid}`, { that.flowemitter.publish(`flow:shadow:${jsonpayload.deviceid}`, {
shadowget : JSON.stringify(jsonpayload) shadowget : JSON.stringify(jsonpayload)
}); });
...@@ -177,14 +169,11 @@ console.log('msg------------------------------') ...@@ -177,14 +169,11 @@ console.log('msg------------------------------')
else if (topic.startsWith('@private/device/status/get/response')){ else if (topic.startsWith('@private/device/status/get/response')){
cache.setStatus(jsonpayload.deviceid, jsonpayload); cache.setStatus(jsonpayload.deviceid, jsonpayload);
//that.flowemitter.publish(`${jsonpayload.deviceid}/device/get`, JSON.stringify(jsonpayload)); //that.flowemitter.publish(`${jsonpayload.deviceid}/device/get`, JSON.stringify(jsonpayload));
that.flowemitter.publish(`flow:device:${jsonpayload.deviceid}`, { that.flowemitter.publish(`flow:device:${jsonpayload.deviceid}`, {
deviceget : JSON.stringify(jsonpayload) deviceget : JSON.stringify(jsonpayload)
}); });
} }
else if (topic.startsWith('@msg/')) { else if (topic.startsWith('@msg/')) {
let part = topic.split('/').splice(1).join('/'); let part = topic.split('/').splice(1).join('/');
let localtopic = `@local/msgin/${part}`; let localtopic = `@local/msgin/${part}`;
...@@ -193,9 +182,6 @@ console.log('msg------------------------------') ...@@ -193,9 +182,6 @@ console.log('msg------------------------------')
payload: payload payload: payload
}); });
//console.log({ topic , payload, localtopic })
// that.flowemitter.publish(`msg:${localtopic}`, { // that.flowemitter.publish(`msg:${localtopic}`, {
// msg : { // msg : {
// localtopic, // localtopic,
...@@ -207,7 +193,14 @@ console.log('msg------------------------------') ...@@ -207,7 +193,14 @@ console.log('msg------------------------------')
} }
Coordinator.prototype = new events.EventEmitter; Coordinator.prototype.on = function(eventname, handler) {
this.events.on(eventname, handler);
}
Coordinator.prototype.emit = function(eventname, data1, data2, data3) {
this.events.emit(eventname, data1, data2, data3);
}
Coordinator.prototype.start = function() { Coordinator.prototype.start = function() {
this.remoteclient.connect(); this.remoteclient.connect();
this.flowemitter.start(); this.flowemitter.start();
......
module.exports.create = create
const DEBUG_SUBLIST = false;
const events = require('events');
const cache = require('./cache');
const Coordinator = function(param={}) {
let that = this;
this.remoteclient = param.remoteclient;
this.localserver = param.localserver || null;
this.localserver.on('clientConnected', function(client){
console.log('client connected', client.id);
let deviceid = client.id.split(':')[0];
client.server.ascoltatore.subscribe(`${deviceid}/+/+`, function(topic, payload, options) {
client.forward(topic, payload, options, topic, 0);
});
if (that.localserver.sublist[deviceid] == undefined || that.localserver.sublist[deviceid] == 0) {
that.localserver.sublist[deviceid] = {
count : 1,
token : client.token
}
that.remoteclient.subscribe(`@tap/shadow/updated/${deviceid}:${client.token}`);
that.remoteclient.subscribe(`@tap/device/changed/${deviceid}:${client.token}`);
that.remoteclient.publish(`@tap/device/get/${deviceid}:${client.token}`);
that.remoteclient.publish(`@tap/shadow/get/${deviceid}:${client.token}`);
}
else{
that.localserver.sublist[deviceid].count++;
}
if (DEBUG_SUBLIST) {
console.log(`\nshadow client #${deviceid} connected -->`)
console.log(that.localserver.sublist);
}
});
this.localserver.on('clientDisconnected', function(client){
console.log('client disconnected', client.id);
let deviceid = client.id.split(':')[0];
if (that.localserver.sublist[deviceid].count <= 1) {
that.remoteclient.unsubscribe(`@tap/shadow/updated/${deviceid}:${client.token}`);
that.remoteclient.unsubscribe(`@tap/device/changed/${deviceid}:${client.token}`);
delete that.localserver.sublist[deviceid];
cache.clear(deviceid);
}
else {
that.localserver.sublist[deviceid].count--;
}
});
this.localserver.on('clientSubscribed', function(packet, client) {
let deviceid = client.id.split(':')[0];
// if client try to subscribe @msg/xxx/yyy --> flowagent subscribe @tap on a remote broker
if (packet.topic.startsWith('@local/msgin/')) {
let msgpart = packet.topic.split('/').splice(4).join('/');
outtopic = `@tap/msg/topic/${deviceid}:${client.token}/${msgpart}`;
that.remoteclient.subscribe(outtopic);
}
else {
}
});
this.localserver.on('clientPublished', function(packet, client){
console.log('client published', client.id);
let outtopic, outmsg;
let deviceid = client.id.split(':')[0];
switch (packet.topic) {
case '@shadow/data/update' :
outtopic = `@tap/shadow/update/${deviceid}:${client.token}`;
outmsg = packet.payload.toString();
that.remoteclient.publish(outtopic, outmsg);
break;
case '@local/shadow/get' :
outtopic = `@tap/shadow/get/${deviceid}:${client.token}`;
outmsg = packet.payload.toString();
that.remoteclient.publish(outtopic, outmsg);
break;
case '@local/device/get' :
outtopic = `@tap/device/get/${deviceid}:${client.token}`;
outmsg = packet.payload.toString();
that.remoteclient.publish(outtopic, outmsg);
break;
default:
if (packet.topic.startsWith('@local/msgout/')) {
let part = packet.topic.substr(14);
outtopic = `@tap/msg/topic/${deviceid}:${client.token}/${part}`;
outmsg = packet.payload.toString();
that.remoteclient.publish(outtopic, outmsg);
}
}
});
this.remoteclient.on('connect', function() {
console.log('Connected to NEXPIE message broker.')
that.remoteclient.subscribe('@private/#');
setTimeout(function() {
for (let deviceid in that.localserver.sublist) {
that.remoteclient.subscribe(`@tap/shadow/updated/${deviceid}:${that.localserver.sublist[deviceid].token}`);
that.remoteclient.subscribe(`@tap/device/changed/${deviceid}:${that.localserver.sublist[deviceid].token}`);
that.remoteclient.publish(`@tap/device/get/${deviceid}:${that.localserver.sublist[deviceid].token}`);
that.remoteclient.publish(`@tap/shadow/get/${deviceid}:${that.localserver.sublist[deviceid].token}`);
}
}, 1000);
});
this.remoteclient.on('message', function(topic, payload){
let jsonpayload = {};
try {
jsonpayload = JSON.parse(payload.toString());
}
catch(e) {}
if (topic.startsWith('@shadow/data/updated')) {
let newpayload = cache.mergeShadow(jsonpayload.deviceid, jsonpayload);
that.localserver.publish(`${jsonpayload.deviceid}/shadow/merged`, JSON.stringify(newpayload) );
that.localserver.publish(`${jsonpayload.deviceid}/shadow/updated`, JSON.stringify(jsonpayload) );
}
else if (topic.startsWith('@device/status/changed')){
cache.setStatus(jsonpayload.deviceid, jsonpayload);
that.localserver.publish(`${jsonpayload.deviceid}/device/changed`, JSON.stringify(jsonpayload));
}
else if (topic.startsWith('@private/shadow/data/get/response')){
cache.setShadow(jsonpayload.deviceid, jsonpayload);
that.localserver.publish(`${jsonpayload.deviceid}/shadow/get`, JSON.stringify(jsonpayload));
}
else if (topic.startsWith('@private/device/status/get/response')){
cache.setStatus(jsonpayload.deviceid, jsonpayload);
that.localserver.publish(`${jsonpayload.deviceid}/device/get`, JSON.stringify(jsonpayload));
}
else if (topic.startsWith('@msg/')) {
let part = topic.split('/').splice(1).join('/');
let localtopic = `@local/msgin/${part}`;
that.localserver.publish(localtopic, payload);
}
});
}
Coordinator.prototype = new events.EventEmitter;
Coordinator.prototype.start = function() {
this.remoteclient.connect();
this.localserver.start();
}
function create(param) {
return new Coordinator(param);
}
...@@ -8,12 +8,12 @@ const events = require('events'); ...@@ -8,12 +8,12 @@ const events = require('events');
const FlowAgent = function(option = {}) { const FlowAgent = function(option = {}) {
let that = this; let that = this;
const Coordinatior = require('./coordinator');
this.option = { this.option = {
broker_uri : option.broker_uri, broker_uri : option.broker_uri,
flowagentid : option.flowagentid, flowagentid : option.flowagentid,
flowagentsecret : option.flowagentsecret, flowagentsecret : option.flowagentsecret,
mode : option.mode,
red : option.red || null red : option.red || null
} }
...@@ -27,7 +27,6 @@ const FlowAgent = function(option = {}) { ...@@ -27,7 +27,6 @@ const FlowAgent = function(option = {}) {
} }
}); });
console.log({ console.log({
host: that.option.broker_uri, host: that.option.broker_uri,
options: { options: {
...@@ -37,42 +36,17 @@ const FlowAgent = function(option = {}) { ...@@ -37,42 +36,17 @@ const FlowAgent = function(option = {}) {
keepalive: 30 keepalive: 30
}}); }});
if (this.option.mode == 'mqtt') {
let Coordinatior = require('./coordinator_mqtt');
this.localserver = MQTTServer.create({
host : '0.0.0.0',
port : 51883
});
this.coordinator = Coordinatior.create({
remoteclient: that.remoteclient,
localserver: that.localserver
});
}
else if (this.option.mode == 'emitter') {
let Coordinatior = require('./coordinator_emitter');
this.flowemitter = FlowEmitter.create({red: this.option.red}); this.flowemitter = FlowEmitter.create({red: this.option.red});
this.coordinator = Coordinatior.create({ this.coordinator = Coordinatior.create({
remoteclient: that.remoteclient, remoteclient: that.remoteclient,
flowemitter: that.flowemitter flowemitter: that.flowemitter
}); });
}
else {
throw new Error("Flow agent mode must be defined to be either 'mqtt' or 'emitter'.");
}
} }
FlowAgent.prototype = new events.EventEmitter; FlowAgent.prototype = new events.EventEmitter;
FlowAgent.prototype.start = function() { FlowAgent.prototype.start = function() {
console.log('FLowAgent started...') console.log('FlowAgent started...')
this.coordinator.start(); this.coordinator.start();
} }
......
...@@ -4,11 +4,17 @@ const events = require('events'); ...@@ -4,11 +4,17 @@ const events = require('events');
const FlowEmitter = function(param = {}) { const FlowEmitter = function(param = {}) {
this.red = param.red; // node-red object this.red = param.red; // node-red object
// should be better to check and remove only listeners of a flowagent module
this.red.events.removeAllListeners();
this.sublist = {}; this.sublist = {};
this.globalcontext = param.globalcontext; this.globalcontext = param.globalcontext;
this.events = new events.EventEmitter;
} }
FlowEmitter.prototype = new events.EventEmitter; //FlowEmitter.prototype = new events.EventEmitter;
FlowEmitter.prototype.start = function() { FlowEmitter.prototype.start = function() {
let that = this; let that = this;
...@@ -29,17 +35,6 @@ FlowEmitter.prototype.start = function() { ...@@ -29,17 +35,6 @@ FlowEmitter.prototype.start = function() {
} }
// // a method for ndoe-red block to register
// FlowEmitter.prototype.register = function(deviceid, devicetoken) {
// let that = this;
// }
// FlowEmitter.prototype.deregister = function(deviceid) {
// let that = this;
// }
FlowEmitter.prototype.publish = function(topic, payload) { FlowEmitter.prototype.publish = function(topic, payload) {
let that = this; let that = this;
this.red.events.emit(topic, payload); this.red.events.emit(topic, payload);
...@@ -50,6 +45,13 @@ FlowEmitter.prototype.subscribe = function(topic) { ...@@ -50,6 +45,13 @@ FlowEmitter.prototype.subscribe = function(topic) {
} }
FlowEmitter.prototype.on = function(eventname, handler) {
this.events.on(eventname, handler);
}
FlowEmitter.prototype.emit = function(eventname, data1, data2, data3) {
this.events.emit(eventname, data1, data2, data3);
}
function create(param) { function create(param) {
return new FlowEmitter(param); return new FlowEmitter(param);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment