Commit 7a965d29 by Chavee Issariyapat

upgrade flowagent to suppport multiple platform

parent bf9006d1
...@@ -11,60 +11,19 @@ const Coordinator = function(param={}) { ...@@ -11,60 +11,19 @@ const Coordinator = function(param={}) {
let that = this; let that = this;
let resettimer = 0; let resettimer = 0;
this.started = false;
this.events = new events.EventEmitter; this.events = new events.EventEmitter;
this.remoteclient = param.remoteclient; this.remoteclient = param.remoteclient;
this.flowemitter = param.flowemitter || null; this.flowemitter = param.flowemitter || null;
// function resetSubscription() {
// }
// function checkupSubscription() {
// let nodeList = that.flowemitter.getDeviceList();
// let sublist = {};
// let topic = '';
// function safeSub(topic) {
// if (topic && !sublist[topic]) {
// that.remoteclient.subscribe(topic);
// sublist[topic] = true;
// }
// }
// for (let nodeid in nodeList) {
// if (!nodeList[nodeid].subbbed) {
// safeSub(`@tap/shadow/updated/${deviceid}:${devicetoken}`);
// safeSub(`@tap/device/changed/${deviceid}:${devicetoken}`);
// let tp;
// for (let i=0; i<nodeList[nodeid].msgtopic.length; i++) {
// safeSub(nodeList[nodeid].msgtopic[i]);
// }
// nodeList[nodeid].subbbed = true;
// }
// }
// }
this.flowemitter.on('newclient', function(client){ this.flowemitter.on('newclient', function(client){
let deviceid = client.deviceid; let deviceid = client.deviceid;
let devicetoken = client.devicetoken; let devicetoken = client.devicetoken;
//let nodeList = that.flowemitter.getDeviceList();
// if (nodeList[client.nodeid] && !nodeList[client.nodeid].subbed ) {
// nodeList[client.nodeid].subbed = true;
// let deviceid = nodeList[client.nodeid].deviceid;
// let devicetoken = nodeList[client.nodeid].devicetoken;
that.remoteclient.subscribe(`@tap/shadow/updated/${deviceid}:${devicetoken}`); that.remoteclient.subscribe(`@tap/shadow/updated/${deviceid}:${devicetoken}`);
that.remoteclient.subscribe(`@tap/device/changed/${deviceid}:${devicetoken}`); that.remoteclient.subscribe(`@tap/device/changed/${deviceid}:${devicetoken}`);
// that.remoteclient.publish(`@tap/device/get/${deviceid}:${devicetoken}`);
// that.remoteclient.publish(`@tap/shadow/get/${deviceid}:${devicetoken}`);
//}
if (DEBUG_SUBLIST) { if (DEBUG_SUBLIST) {
console.log(`\nshadow client #${deviceid} connected -->`) console.log(`\nshadow client #${deviceid} connected -->`)
console.log(that.flowemitter.sublist); console.log(that.flowemitter.sublist);
...@@ -128,19 +87,10 @@ const Coordinator = function(param={}) { ...@@ -128,19 +87,10 @@ const Coordinator = function(param={}) {
this.remoteclient.on('connect', function() { this.remoteclient.on('connect', function() {
console.log(' [info] Connected to MQTT broker.') console.log(' [info] Connected to MQTT broker.')
that.remoteclient.subscribe('@private/#'); that.remoteclient.subscribe('@private/#');
// setTimeout(function() {
// for (let deviceid in that.flowemitter.sublist) {
// that.remoteclient.subscribe(`@tap/shadow/updated/${deviceid}:${that.flowemitter.sublist[deviceid].token}`);
// that.remoteclient.subscribe(`@tap/device/changed/${deviceid}:${that.flowemitter.sublist[deviceid].token}`);
// that.remoteclient.publish(`@tap/device/get/${deviceid}:${that.flowemitter.sublist[deviceid].token}`);
// that.remoteclient.publish(`@tap/shadow/get/${deviceid}:${that.flowemitter.sublist[deviceid].token}`);
// }
// }, 1000);
}); });
this.remoteclient.on('close', function() { this.remoteclient.on('close', function() {
this.started = false;
console.log(' [info] Disconnect from MQTT broker.') console.log(' [info] Disconnect from MQTT broker.')
}); });
...@@ -185,7 +135,6 @@ const Coordinator = function(param={}) { ...@@ -185,7 +135,6 @@ const Coordinator = function(param={}) {
}); });
} }
}); });
} }
Coordinator.prototype.on = function(eventname, handler) { Coordinator.prototype.on = function(eventname, handler) {
...@@ -197,8 +146,16 @@ Coordinator.prototype.emit = function(eventname, data1, data2, data3) { ...@@ -197,8 +146,16 @@ Coordinator.prototype.emit = function(eventname, data1, data2, data3) {
} }
Coordinator.prototype.start = function() { Coordinator.prototype.start = function() {
if (!this.started) {
this.started = true;
this.remoteclient.connect(); this.remoteclient.connect();
this.flowemitter.start(); this.flowemitter.start();
return true;
}
else {
return false;
}
} }
function create(param) { function create(param) {
......
...@@ -17,24 +17,31 @@ let FlowAgent = function(option = {}) { ...@@ -17,24 +17,31 @@ let FlowAgent = function(option = {}) {
broker_uri : option.broker_uri, broker_uri : option.broker_uri,
flowagentid : option.flowagentid, flowagentid : option.flowagentid,
flowagentsecret : option.flowagentsecret, flowagentsecret : option.flowagentsecret,
red : option.red || null red : option.red || null,
namespace : option.namespace
} }
if (this.option.red) { if (this.option.red) {
this.option.red.events.setMaxListeners(100); this.option.red.events.setMaxListeners(100);
} }
this.remoteclient = MQTTClient.create({ let opt = {
host: that.option.broker_uri,
options: {
clientId: Date.now()+'-'+that.option.flowagentid, clientId: Date.now()+'-'+that.option.flowagentid,
username: that.option.flowagentid, username: that.option.flowagentid,
password: that.option.flowagentsecret, password: that.option.flowagentsecret,
keepalive: 30 keepalive: 30
} }
this.remoteclient = MQTTClient.create({
host: that.option.broker_uri,
options: opt
});
this.flowemitter = FlowEmitter.create({
red: this.option.red,
namespace: this.option.namespace
}); });
this.flowemitter = FlowEmitter.create({red: this.option.red});
this.coordinator = Coordinatior.create({ this.coordinator = Coordinatior.create({
remoteclient: that.remoteclient, remoteclient: that.remoteclient,
flowemitter: that.flowemitter flowemitter: that.flowemitter
...@@ -50,8 +57,9 @@ FlowAgent.prototype.emit = function(eventname, data1, data2, data3) { ...@@ -50,8 +57,9 @@ FlowAgent.prototype.emit = function(eventname, data1, data2, data3) {
} }
FlowAgent.prototype.start = function() { FlowAgent.prototype.start = function() {
console.log('FlowAgent started...') if (this.coordinator.start()) {
this.coordinator.start(); console.log('Staring FlowAgent...')
}
} }
FlowAgent.prototype.getInfo = function() { FlowAgent.prototype.getInfo = function() {
...@@ -62,7 +70,6 @@ FlowAgent.prototype.getInfo = function() { ...@@ -62,7 +70,6 @@ FlowAgent.prototype.getInfo = function() {
} }
} }
function create(option) { function create(option) {
return new FlowAgent(option); return new FlowAgent(option);
} }
...@@ -6,38 +6,38 @@ const FlowEmitter = function(param = {}) { ...@@ -6,38 +6,38 @@ const FlowEmitter = function(param = {}) {
this.red = param.red; // node-red object this.red = param.red; // node-red object
this.globalcontext = param.globalcontext; this.globalcontext = param.globalcontext;
this.events = new events.EventEmitter; this.events = new events.EventEmitter;
this.namespace = param.namespace; // namespace is for communicating with RED.events
} }
FlowEmitter.prototype.start = function() { FlowEmitter.prototype.start = function() {
let that = this; let that = this;
this.red.events.on('register', function(client) { this.red.events.on(`${that.namespace}-register`, function(client) {
that.emit('newclient', client); that.emit('newclient', client);
}); });
this.red.events.on('deregister', function(client) { this.red.events.on(`${that.namespace}-deregister`, function(client) {
//delete that.devicelist[client.nodeid];
that.emit('startcountdownreset'); that.emit('startcountdownreset');
}); });
this.red.events.on('flowsub', function(topic, client) { this.red.events.on(`${that.namespace}-flowsub`, function(topic, client) {
//that.devicelist[client.nodeid].msgtopic.push(topic); //that.devicelist[client.nodeid].msgtopic.push(topic);
that.emit('flowsub', topic, client); that.emit('flowsub', topic, client);
}); });
this.red.events.on('flowpub', function(topic, payload, client) { this.red.events.on(`${that.namespace}-flowpub`, function(topic, payload, client) {
that.emit('flowpub', topic, payload, client); that.emit('flowpub', topic, payload, client);
}); });
} }
FlowEmitter.prototype.pub = function(topic, payload) { FlowEmitter.prototype.pub = function(topic, payload) {
this.red.events.emit(topic, payload); let that = this;
this.red.events.emit(`${that.namespace}-${topic}`, payload);
} }
FlowEmitter.prototype.sub = function(topic) { FlowEmitter.prototype.redemit = function(eventname, data1, data2, data3) {
let that = this; let that = this;
this.red.events.emit(`${that.namespace}-${eventname}`, data1, data2, data3);
} }
FlowEmitter.prototype.on = function(eventname, handler) { FlowEmitter.prototype.on = function(eventname, handler) {
...@@ -48,10 +48,6 @@ FlowEmitter.prototype.emit = function(eventname, data1, data2, data3) { ...@@ -48,10 +48,6 @@ FlowEmitter.prototype.emit = function(eventname, data1, data2, data3) {
this.events.emit(eventname, data1, data2, data3); this.events.emit(eventname, data1, data2, data3);
} }
FlowEmitter.prototype.redemit = function(eventname, data1, data2, data3) {
this.red.events.emit(eventname, data1, data2, data3);
}
function create(param) { function create(param) {
return new FlowEmitter(param); return new FlowEmitter(param);
} }
...@@ -21,10 +21,10 @@ MQTTClient.prototype.connect = function() { ...@@ -21,10 +21,10 @@ MQTTClient.prototype.connect = function() {
this.client = MQTT.connect(this.host, this.options || {}); this.client = MQTT.connect(this.host, this.options || {});
this.client.on('connect', async function() { this.client.on('connect', function() {
for (let topic in that.subscription) { // for (let topic in that.subscription) {
await sleep(100); // await sleep(100);
} // }
that.emit('connect'); that.emit('connect');
}); });
...@@ -49,39 +49,30 @@ MQTTClient.prototype.connect = function() { ...@@ -49,39 +49,30 @@ MQTTClient.prototype.connect = function() {
MQTTClient.prototype.resetbroker = function(topic, payload) { MQTTClient.prototype.resetbroker = function(topic, payload) {
this.client.end(); this.client.end();
//this.subscription = {};
this.client.reconnect(); this.client.reconnect();
} }
MQTTClient.prototype.publish = function(topic, payload) { MQTTClient.prototype.publish = function(topic, payload) {
if (this.client) { if (this.client && this.client.connected) {
//console.log(' mqtt pub --> '+topic);
if (this.subscription[topic]) this.subscription[topic]++;
else this.subscription[topic] = 1;
this.client.publish(topic, payload); this.client.publish(topic, payload);
} }
else { else {
} }
} }
MQTTClient.prototype.subscribe = function(topic) { MQTTClient.prototype.subscribe = function(topic) {
if (this.client) { if (this.client && this.client.connected) {
this.client.subscribe(topic); this.client.subscribe(topic);
} }
else { else {
} }
} }
MQTTClient.prototype.unsubscribe = function(topic) { MQTTClient.prototype.unsubscribe = function(topic) {
if (this.client) { if (this.client && this.client.connected) {
this.client.unsubscribe(topic); this.client.unsubscribe(topic);
} }
else { else {
} }
} }
......
{ {
"name": "flowagent", "name": "flowagent",
"version": "1.0.0", "version": "1.1.1",
"lockfileVersion": 1, "lockfileVersion": 1,
"requires": true, "requires": true,
"dependencies": { "dependencies": {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment