Commit cd4bc42f by Chavee Issariyapat

udpate

parent a3e59c58
...@@ -104,7 +104,7 @@ const Coordinator = function(param={}) { ...@@ -104,7 +104,7 @@ const Coordinator = function(param={}) {
}); });
*/ */
this.flowemitter.on('flowSubscribe', function(topic, client) { this.flowemitter.on('flowsub', function(topic, client) {
let deviceid = client.id.split(':')[0]; let deviceid = client.id.split(':')[0];
// if client try to subscribe @msg/xxx/yyy --> flowagent subscribe @tap on a remote broker // if client try to subscribe @msg/xxx/yyy --> flowagent subscribe @tap on a remote broker
if (topic.startsWith('@local/msgin/')) { if (topic.startsWith('@local/msgin/')) {
...@@ -145,39 +145,44 @@ const Coordinator = function(param={}) { ...@@ -145,39 +145,44 @@ const Coordinator = function(param={}) {
let newpayload = cache.mergeShadow(jsonpayload.deviceid, jsonpayload); let newpayload = cache.mergeShadow(jsonpayload.deviceid, jsonpayload);
// that.flowemitter.publish(`${jsonpayload.deviceid}/shadow/merged`, JSON.stringify(newpayload) ); // that.flowemitter.publish(`${jsonpayload.deviceid}/shadow/merged`, JSON.stringify(newpayload) );
// that.flowemitter.publish(`${jsonpayload.deviceid}/shadow/updated`, JSON.stringify(jsonpayload) ); // that.flowemitter.publish(`${jsonpayload.deviceid}/shadow/updated`, JSON.stringify(jsonpayload) );
that.flowemitter.publish(`flow:shadow:${jsonpayload.deviceid}`, { that.flowemitter.pub(`flow:shadow:${jsonpayload.deviceid}`, {
shadowupdated : JSON.stringify(jsonpayload), // shadowupdated : JSON.stringify(jsonpayload),
shadowmerged : JSON.stringify(newpayload) // shadowmerged : JSON.stringify(newpayload)
shadowupdated : jsonpayload,
shadowmerged : newpayload
}); });
} }
else if (topic.startsWith('@device/status/changed')){ else if (topic.startsWith('@device/status/changed')){
cache.setStatus(jsonpayload.deviceid, jsonpayload); cache.setStatus(jsonpayload.deviceid, jsonpayload);
//that.flowemitter.publish(`${jsonpayload.deviceid}/device/changed`, JSON.stringify(jsonpayload)); //that.flowemitter.publish(`${jsonpayload.deviceid}/device/changed`, JSON.stringify(jsonpayload));
that.flowemitter.publish(`flow:device:${jsonpayload.deviceid}`, { that.flowemitter.pub(`flow:device:${jsonpayload.deviceid}`, {
devicechanged : JSON.stringify(jsonpayload) //devicechanged : JSON.stringify(jsonpayload)
devicechanged : jsonpayload
}); });
} }
else if (topic.startsWith('@private/shadow/data/get/response')){ else if (topic.startsWith('@private/shadow/data/get/response')){
cache.setShadow(jsonpayload.deviceid, jsonpayload); cache.setShadow(jsonpayload.deviceid, jsonpayload);
//that.flowemitter.publish(`${jsonpayload.deviceid}/shadow/get`, JSON.stringify(jsonpayload)); //that.flowemitter.publish(`${jsonpayload.deviceid}/shadow/get`, JSON.stringify(jsonpayload));
that.flowemitter.publish(`flow:shadow:${jsonpayload.deviceid}`, { that.flowemitter.pub(`flow:shadow:${jsonpayload.deviceid}`, {
shadowget : JSON.stringify(jsonpayload) //shadowget : JSON.stringify(jsonpayload)
shadowget : jsonpayload
}); });
} }
else if (topic.startsWith('@private/device/status/get/response')){ else if (topic.startsWith('@private/device/status/get/response')){
cache.setStatus(jsonpayload.deviceid, jsonpayload); cache.setStatus(jsonpayload.deviceid, jsonpayload);
//that.flowemitter.publish(`${jsonpayload.deviceid}/device/get`, JSON.stringify(jsonpayload)); //that.flowemitter.publish(`${jsonpayload.deviceid}/device/get`, JSON.stringify(jsonpayload));
that.flowemitter.publish(`flow:device:${jsonpayload.deviceid}`, { that.flowemitter.pub(`flow:device:${jsonpayload.deviceid}`, {
deviceget : JSON.stringify(jsonpayload) //deviceget : JSON.stringify(jsonpayload)
deviceget : jsonpayload
}); });
} }
else if (topic.startsWith('@msg/')) { else if (topic.startsWith('@msg/')) {
let part = topic.split('/').splice(1).join('/'); let part = topic.split('/').splice(1).join('/');
let localtopic = `@local/msgin/${part}`; let localtopic = `@local/msgin/${part}`;
that.flowemitter.publish(`flow:message`, { that.flowemitter.pub(`flow:message`, {
topic: localtopic, topic: localtopic,
payload: payload payload: payload
}); });
......
...@@ -6,10 +6,13 @@ const FlowEmitter = require('./flowemitter'); ...@@ -6,10 +6,13 @@ const FlowEmitter = require('./flowemitter');
const events = require('events'); const events = require('events');
const FlowAgent = function(option = {}) { let FlowAgent = function(option = {}) {
let that = this; let that = this;
const Coordinatior = require('./coordinator'); const Coordinatior = require('./coordinator');
this.events = new events.EventEmitter;
this.option = { this.option = {
broker_uri : option.broker_uri, broker_uri : option.broker_uri,
flowagentid : option.flowagentid, flowagentid : option.flowagentid,
...@@ -17,6 +20,10 @@ const FlowAgent = function(option = {}) { ...@@ -17,6 +20,10 @@ const FlowAgent = function(option = {}) {
red : option.red || null red : option.red || null
} }
// if (!this.option.red.flowevents) {
// this.option.red.flowevents = new events.EventEmitter;
// }
this.remoteclient = MQTTClient.create({ this.remoteclient = MQTTClient.create({
host: that.option.broker_uri, host: that.option.broker_uri,
options: { options: {
...@@ -27,14 +34,14 @@ const FlowAgent = function(option = {}) { ...@@ -27,14 +34,14 @@ const FlowAgent = function(option = {}) {
} }
}); });
console.log({ // console.log({
host: that.option.broker_uri, // host: that.option.broker_uri,
options: { // options: {
clientId: Date.now()+'-'+that.option.flowagentid, // clientId: Date.now()+'-'+that.option.flowagentid,
username: that.option.flowagentid, // username: that.option.flowagentid,
password: that.option.flowagentsecret, // password: that.option.flowagentsecret,
keepalive: 30 // keepalive: 30
}}); // }});
this.flowemitter = FlowEmitter.create({red: this.option.red}); this.flowemitter = FlowEmitter.create({red: this.option.red});
this.coordinator = Coordinatior.create({ this.coordinator = Coordinatior.create({
...@@ -43,7 +50,15 @@ const FlowAgent = function(option = {}) { ...@@ -43,7 +50,15 @@ const FlowAgent = function(option = {}) {
}); });
} }
FlowAgent.prototype = new events.EventEmitter; //FlowAgent.prototype = new events.EventEmitter;
FlowAgent.prototype.on = function(eventname, handler) {
this.on(eventname, handler);
}
FlowAgent.prototype.emit = function(eventname, data1, data2, data3) {
this.emit(eventname, data1, data2, data3);
}
FlowAgent.prototype.start = function() { FlowAgent.prototype.start = function() {
console.log('FlowAgent started...') console.log('FlowAgent started...')
......
...@@ -5,17 +5,12 @@ const events = require('events'); ...@@ -5,17 +5,12 @@ const events = require('events');
const FlowEmitter = function(param = {}) { const FlowEmitter = function(param = {}) {
this.red = param.red; // node-red object this.red = param.red; // node-red object
// should be better to check and remove only listeners of a flowagent module
this.red.events.removeAllListeners();
this.sublist = {}; this.sublist = {};
this.globalcontext = param.globalcontext; this.globalcontext = param.globalcontext;
this.events = new events.EventEmitter; this.events = new events.EventEmitter;
} }
//FlowEmitter.prototype = new events.EventEmitter;
FlowEmitter.prototype.start = function() { FlowEmitter.prototype.start = function() {
let that = this; let that = this;
...@@ -26,8 +21,8 @@ FlowEmitter.prototype.start = function() { ...@@ -26,8 +21,8 @@ FlowEmitter.prototype.start = function() {
}); });
}); });
this.red.events.on('flowSubscribe', function(topic, client) { this.red.events.on('flowsub', function(topic, client) {
that.emit('flowSubscribe', topic, { that.emit('flowsub', topic, {
id : client.clientId, id : client.clientId,
token : client.username token : client.username
}); });
...@@ -35,12 +30,11 @@ FlowEmitter.prototype.start = function() { ...@@ -35,12 +30,11 @@ FlowEmitter.prototype.start = function() {
} }
FlowEmitter.prototype.publish = function(topic, payload) { FlowEmitter.prototype.pub = function(topic, payload) {
let that = this;
this.red.events.emit(topic, payload); this.red.events.emit(topic, payload);
} }
FlowEmitter.prototype.subscribe = function(topic) { FlowEmitter.prototype.sub = function(topic) {
let that = this; let that = this;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment