Commit 051d5a26 by Chavee Issariyapat

more updates

parent 689cf194
module.exports.create = create
const DEBUG_SUBLIST = true;
const events = require('events');
const cache = require('./cache');
const Coordinator = function(param={}) {
let that = this;
this.remoteclient = param.remoteclient;
this.flowemitter = param.flowemitter || null;
this.flowemitter.on('newclient', function(client){
let deviceid = client.deviceid;
let devicetoken = client.devicetoken;
console.log('new incoming client', deviceid);
if (that.flowemitter.sublist[deviceid] == undefined || that.flowemitter.sublist[deviceid] == 0) {
that.flowemitter.sublist[deviceid] = {
count : 1,
token : devicetoken
}
that.remoteclient.subscribe(`@tap/shadow/updated/${deviceid}:${client.token}`);
that.remoteclient.subscribe(`@tap/device/changed/${deviceid}:${client.token}`);
that.remoteclient.publish(`@tap/device/get/${deviceid}:${client.token}`);
that.remoteclient.publish(`@tap/shadow/get/${deviceid}:${client.token}`);
}
else{
that.flowemitter.sublist[deviceid].count++;
}
if (DEBUG_SUBLIST) {
console.log(`\nshadow client #${deviceid} connected -->`)
console.log(that.flowemitter.sublist);
}
});
/*
this.localserver.on('clientDisconnected', function(client){
console.log('client disconnected', client.id);
let deviceid = client.id.split(':')[0];
if (that.localserver.sublist[deviceid].count <= 1) {
that.remoteclient.unsubscribe(`@tap/shadow/updated/${deviceid}:${client.token}`);
that.remoteclient.unsubscribe(`@tap/device/changed/${deviceid}:${client.token}`);
delete that.localserver.sublist[deviceid];
cache.clear(deviceid);
}
else {
that.localserver.sublist[deviceid].count--;
}
});
this.localserver.on('clientSubscribed', function(packet, client) {
let deviceid = client.id.split(':')[0];
// if client try to subscribe @msg/xxx/yyy --> flowagent subscribe @tap on a remote broker
if (packet.topic.startsWith('@local/msgin/')) {
let msgpart = packet.topic.split('/').splice(4).join('/');
outtopic = `@tap/msg/topic/${deviceid}:${client.token}/${msgpart}`;
that.remoteclient.subscribe(outtopic);
}
else {
}
});
this.localserver.on('clientPublished', function(packet, client){
console.log('client published', client.id);
let outtopic, outmsg;
let deviceid = client.id.split(':')[0];
switch (packet.topic) {
case '@shadow/data/update' :
outtopic = `@tap/shadow/update/${deviceid}:${client.token}`;
outmsg = packet.payload.toString();
that.remoteclient.publish(outtopic, outmsg);
break;
case '@local/shadow/get' :
outtopic = `@tap/shadow/get/${deviceid}:${client.token}`;
outmsg = packet.payload.toString();
that.remoteclient.publish(outtopic, outmsg);
break;
case '@local/device/get' :
outtopic = `@tap/device/get/${deviceid}:${client.token}`;
outmsg = packet.payload.toString();
that.remoteclient.publish(outtopic, outmsg);
break;
default:
if (packet.topic.startsWith('@local/msgout/')) {
let part = packet.topic.substr(14);
outtopic = `@tap/msg/topic/${deviceid}:${client.token}/${part}`;
outmsg = packet.payload.toString();
that.remoteclient.publish(outtopic, outmsg);
}
}
});
*/
this.flowemitter.on('flowSubscribe', function(topic, client) {
console.log({topic, client})
let deviceid = client.id.split(':')[0];
// if client try to subscribe @msg/xxx/yyy --> flowagent subscribe @tap on a remote broker
if (topic.startsWith('@local/msgin/')) {
let msgpart = packet.topic.split('/').splice(4).join('/');
outtopic = `@tap/msg/topic/${deviceid}:${client.token}/${msgpart}`;
that.remoteclient.subscribe(outtopic);
}
else {
}
});
this.remoteclient.on('connect', function() {
console.log('Connected to NEXPIE message broker.')
that.remoteclient.subscribe('@private/#');
setTimeout(function() {
for (let deviceid in that.flowemitter.sublist) {
that.remoteclient.subscribe(`@tap/shadow/updated/${deviceid}:${that.flowemitter.sublist[deviceid].token}`);
that.remoteclient.subscribe(`@tap/device/changed/${deviceid}:${that.flowemitter.sublist[deviceid].token}`);
that.remoteclient.publish(`@tap/device/get/${deviceid}:${that.flowemitter.sublist[deviceid].token}`);
that.remoteclient.publish(`@tap/shadow/get/${deviceid}:${that.flowemitter.sublist[deviceid].token}`);
}
}, 1000);
});
this.remoteclient.on('message', function(topic, payload){
let jsonpayload = {};
try {
jsonpayload = JSON.parse(payload.toString());
}
catch(e) {}
console.log('msg------------------------------')
if (topic.startsWith('@shadow/data/updated')) {
let newpayload = cache.mergeShadow(jsonpayload.deviceid, jsonpayload);
// that.flowemitter.publish(`${jsonpayload.deviceid}/shadow/merged`, JSON.stringify(newpayload) );
// that.flowemitter.publish(`${jsonpayload.deviceid}/shadow/updated`, JSON.stringify(jsonpayload) );
that.flowemitter.publish(`flow:shadow:${jsonpayload.deviceid}`, {
shadowupdated : JSON.stringify(jsonpayload),
shadowmerged : JSON.stringify(newpayload)
});
}
else if (topic.startsWith('@device/status/changed')){
cache.setStatus(jsonpayload.deviceid, jsonpayload);
//that.flowemitter.publish(`${jsonpayload.deviceid}/device/changed`, JSON.stringify(jsonpayload));
that.flowemitter.publish(`flow:device:${jsonpayload.deviceid}`, {
devicechanged : JSON.stringify(jsonpayload)
});
}
else if (topic.startsWith('@private/shadow/data/get/response')){
cache.setShadow(jsonpayload.deviceid, jsonpayload);
//that.flowemitter.publish(`${jsonpayload.deviceid}/shadow/get`, JSON.stringify(jsonpayload));
that.flowemitter.publish(`flow:shadow:${jsonpayload.deviceid}`, {
shadowget : JSON.stringify(jsonpayload)
});
}
else if (topic.startsWith('@private/device/status/get/response')){
cache.setStatus(jsonpayload.deviceid, jsonpayload);
//that.flowemitter.publish(`${jsonpayload.deviceid}/device/get`, JSON.stringify(jsonpayload));
that.flowemitter.publish(`flow:device:${jsonpayload.deviceid}`, {
deviceget : JSON.stringify(jsonpayload)
});
}
else if (topic.startsWith('@msg/')) {
let part = topic.split('/').splice(1).join('/');
let localtopic = `@local/msgin/${part}`;
that.flowemitter.publish(`flow:message`, {
topic: localtopic,
payload: payload
});
//console.log({ topic , payload, localtopic })
// that.flowemitter.publish(`msg:${localtopic}`, {
// msg : {
// localtopic,
// payload
// }
// });
}
});
}
Coordinator.prototype = new events.EventEmitter;
Coordinator.prototype.start = function() {
this.remoteclient.connect();
this.flowemitter.start();
}
function create(param) {
return new Coordinator(param);
}
...@@ -9,10 +9,8 @@ const Coordinator = function(param={}) { ...@@ -9,10 +9,8 @@ const Coordinator = function(param={}) {
let that = this; let that = this;
this.remoteclient = param.remoteclient; this.remoteclient = param.remoteclient;
// this.localserver = param.localserver || null; this.localserver = param.localserver || null;
this.flowemitter = param.flowemitter;
/*
this.localserver.on('clientConnected', function(client){ this.localserver.on('clientConnected', function(client){
console.log('client connected', client.id); console.log('client connected', client.id);
let deviceid = client.id.split(':')[0]; let deviceid = client.id.split(':')[0];
...@@ -149,14 +147,13 @@ const Coordinator = function(param={}) { ...@@ -149,14 +147,13 @@ const Coordinator = function(param={}) {
that.localserver.publish(localtopic, payload); that.localserver.publish(localtopic, payload);
} }
}); });
*/
} }
Coordinator.prototype = new events.EventEmitter; Coordinator.prototype = new events.EventEmitter;
Coordinator.prototype.start = function() { Coordinator.prototype.start = function() {
this.remoteclient.connect(); this.remoteclient.connect();
//this.localserver.start(); this.localserver.start();
this.flowemitter.start();
} }
function create(param) { function create(param) {
......
module.exports.create = create module.exports.create = create
const MQTTClient = require('./mqttclient'); const MQTTClient = require('./mqttclient');
const MQTTServer = require('./mqttserver');
const FlowEmitter = require('./flowemitter'); const FlowEmitter = require('./flowemitter');
const Coordinatior = require('./coordinator');
const events = require('events'); const events = require('events');
...@@ -12,11 +12,11 @@ const FlowAgent = function(option = {}) { ...@@ -12,11 +12,11 @@ const FlowAgent = function(option = {}) {
this.option = { this.option = {
broker_uri : option.broker_uri, broker_uri : option.broker_uri,
flowagentid : option.flowagentid, flowagentid : option.flowagentid,
flowagentsecret : option.flowagentsecret flowagentsecret : option.flowagentsecret,
mode : option.mode,
red : option.red || null
} }
this.context = option.context;
this.remoteclient = MQTTClient.create({ this.remoteclient = MQTTClient.create({
host: that.option.broker_uri, host: that.option.broker_uri,
options: { options: {
...@@ -26,22 +26,56 @@ const FlowAgent = function(option = {}) { ...@@ -26,22 +26,56 @@ const FlowAgent = function(option = {}) {
keepalive: 30 keepalive: 30
} }
}); });
this.flowemitter = FlowEmitter.create();
this.coordinatior = Coordinatior.create({
console.log({
host: that.option.broker_uri,
options: {
clientId: Date.now()+'-'+that.option.flowagentid,
username: that.option.flowagentid,
password: that.option.flowagentsecret,
keepalive: 30
}});
if (this.option.mode == 'mqtt') {
let Coordinatior = require('./coordinator_mqtt');
this.localserver = MQTTServer.create({
host : '0.0.0.0',
port : 51883
});
this.coordinator = Coordinatior.create({
remoteclient: that.remoteclient,
localserver: that.localserver
});
}
else if (this.option.mode == 'emitter') {
let Coordinatior = require('./coordinator_emitter');
this.flowemitter = FlowEmitter.create({red: this.option.red});
this.coordinator = Coordinatior.create({
remoteclient: that.remoteclient, remoteclient: that.remoteclient,
flowemitter: that.flowemitter flowemitter: that.flowemitter
}); });
}
else {
throw new Error("Flow agent mode must be defined to be either 'mqtt' or 'emitter'.");
}
} }
FlowAgent.prototype = new events.EventEmitter; FlowAgent.prototype = new events.EventEmitter;
FlowAgent.prototype.start = function() { FlowAgent.prototype.start = function() {
this.coordinatior.start(); console.log('FLowAgent started...')
this.coordinator.start();
} }
function create() { function create(option) {
return new FlowAgent(); return new FlowAgent(option);
} }
...@@ -3,6 +3,7 @@ module.exports.create = create ...@@ -3,6 +3,7 @@ module.exports.create = create
const events = require('events'); const events = require('events');
const FlowEmitter = function(param = {}) { const FlowEmitter = function(param = {}) {
this.red = param.red; // node-red object
this.sublist = {}; this.sublist = {};
this.globalcontext = param.globalcontext; this.globalcontext = param.globalcontext;
} }
...@@ -12,20 +13,44 @@ FlowEmitter.prototype = new events.EventEmitter; ...@@ -12,20 +13,44 @@ FlowEmitter.prototype = new events.EventEmitter;
FlowEmitter.prototype.start = function() { FlowEmitter.prototype.start = function() {
let that = this; let that = this;
this.red.events.on('init', function(client) {
that.emit('newclient', {
deviceid : client.deviceid,
devicetoken : client.devicetoken
});
});
this.red.events.on('flowSubscribe', function(topic, client) {
that.emit('flowSubscribe', topic, {
id : client.clientId,
token : client.username
});
});
} }
// a method for ndoe-red block to register // // a method for ndoe-red block to register
FlowEmitter.prototype.register = function(deviceid, devicetoken) { // FlowEmitter.prototype.register = function(deviceid, devicetoken) {
let that = this; // let that = this;
// }
// FlowEmitter.prototype.deregister = function(deviceid) {
// let that = this;
// }
FlowEmitter.prototype.publish = function(topic, payload) {
let that = this;
this.red.events.emit(topic, payload);
} }
FlowEmitter.prototype.deregister = function(deviceid) { FlowEmitter.prototype.subscribe = function(topic) {
let that = this; let that = this;
} }
function create() { function create(param) {
return new FlowEmitter(); return new FlowEmitter(param);
} }
/*
module.exports.create = create
const MQTTClient = require('./mqttclient');
const FlowEmitter = require('./flowemitter');
const Coordinatior = require('./coordinator');
const events = require('events');
const FlowAgent = function(option = {}) {
let that = this;
let this.option = {
broker_uri : option.broker_uri,
flowagentid : option.flowagentid,
flowagentsecret : option.flowagentsecret
}
this.context = option.context;
this.remoteclient = MQTTClient.create({
host: that.option.broker_uri,
options: {
clientId: Date.now()+'-'+that.option.flowagentid,
username: that.option.flowagentid,
password: that.option.flowagentsecret,
keepalive: 30
}
});
this.flowemitter = FlowEmitter.create();
this.coordinatior = Coordinatior.create({
remoteclient: that.remoteclient,
flowemitter: that.flowemitter
});
}
FlowAgent.prototype = new events.EventEmitter;
FlowAgent.prototype.start = function() {
this.coordinatior.start();
}
function create() {
return new FlowAgent();
}
*/
\ No newline at end of file
...@@ -3,11 +3,12 @@ require('dotenv').config(); ...@@ -3,11 +3,12 @@ require('dotenv').config();
const config = require('config'); const config = require('config');
const localBrokerURI = require('url').parse(config.get('config.local_broker_uri')); const localBrokerURI = require('url').parse(config.get('config.local_broker_uri'));
const flowagent = require('./FlowAgent').create({ const flowagent = require('./flowagent').create({
broker_uri : config.get('config.remote_broker_uri'), broker_uri : config.get('config.remote_broker_uri'),
flowagentid: config.get('config.flowagent_username'), flowagentid: config.get('config.flowagent_username'),
flowagentsecret: config.get('config.flowagent_password'), flowagentsecret: config.get('config.flowagent_password'),
mqttenabled : true //mode : 'mqtt'
mode : 'emitter'
}); });
flowagent.start(); flowagent.start();
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment