Commit 96951290 by zCaesar

merge redis to master

parents d43114ae 99c5a125
var config = require('config'); var config = require('config');
// https://github.com/isaacs/node-lru-cache // https://github.com/isaacs/node-lru-cache
var on_message_redis = require('./redis/on_message').on_message_redis
var LRU = require("lru-cache"), var LRU = require("lru-cache"),
cache = LRU({ cache = LRU({
max: 500, max: 500,
...@@ -17,65 +19,68 @@ module.exports = function (options = {}) { ...@@ -17,65 +19,68 @@ module.exports = function (options = {}) {
var topic = req.body.topic; var topic = req.body.topic;
// save redis var authstatus;
require('./redis/on_message').on_message_redis(req.body.client_id, req.body.payload, topic).then(status => {
if (status) {
var authstatus;
var cachekey = 'pub:' + req.body.client_id + ':' + req.body.username + ':' + topic; var cachekey = 'pub:' + req.body.client_id + ':' + req.body.username + ':' + topic;
var authstatus = cache.get(cachekey); var authstatus = cache.get(cachekey);
if (typeof (authstatus) === 'undefined') { if (typeof (authstatus) === 'undefined') {
cache.set(cachekey, true); // cache missed cache.set(cachekey, true); // cache missed
} }
var response var response
var decoded = require('jwt-verify').verify(req.body.username).res var decoded = require('jwt-verify').verify(req.body.username).res
if (decoded) { if (decoded) {
if (decoded.role === 'realtimedb') response = { 'result': 'ok' } if (decoded.role === 'realtimedb') response = { 'result': 'ok' }
else response = { result: {error: 'not allowed'} } else response = { result: { error: 'not allowed' } }
res.send(response); res.send(response);
next(); next();
} }
else { else {
var GGID = require('./utils/getGroupID'); var GGID = require('./utils/getGroupID');
var output = {}; var output = {};
GGID.getGroupID(req.body.username, req.body.client_id, function (group) { GGID.getGroupID(req.body.username, req.body.client_id, function (group) {
if (group) { if (group) {
var _ftopic = require('./utils/router').rewriteTopic(topic, 'pub', group, req.body.client_id, output); // get topic where concat with groupID var _ftopic = require('./utils/router').rewriteTopic(topic, 'pub', group, req.body.client_id, output); // get topic where concat with groupID
response = { response = {
'result': 'ok', 'result': 'ok',
'modifiers': { 'modifiers': {
'topic': _ftopic, 'topic': _ftopic,
'qos': 0, 'qos': 0,
'retain': false 'retain': false
} }
} }
if (options.debug) { if (options.debug) {
console.log(response); console.log(response);
} }
if (output.verb == 'get' || output.verb == 'read') { if (output.verb == 'get' || output.verb == 'read') {
response.modifiers.payload = Buffer.from(req.body.client_id).toString('base64'); response.modifiers.payload = Buffer.from(req.body.client_id).toString('base64');
} }
res.send(response); // save on redis before send response
next(); try {
} on_message_redis(req.body.client_id, req.body.payload, topic)
else { res.send(response);
res.send({ result: {error: 'not allowed'} }); }
next(); catch (e) {
} res.send({ result: { error: 'not allowed' } });
}); }
next();
} }
} else {
}) res.send({ result: { error: 'not allowed' } });
next();
}
});
}
} }
else { else {
res.send({ result: {error: 'not allowed'} }); res.send({ result: { error: 'not allowed' } });
next(); next();
} }
} }
......
...@@ -3,6 +3,7 @@ var validator = require('./validator'); ...@@ -3,6 +3,7 @@ var validator = require('./validator');
var config = require('config'); var config = require('config');
// var seneca = require('seneca')({log: 'silent'}).client({ port: config.get('device_registry_port'), host: config.get('device_registry_host') }); // var seneca = require('seneca')({log: 'silent'}).client({ port: config.get('device_registry_port'), host: config.get('device_registry_host') });
var seneca = require('seneca')({ log: 'silent' }).client({ port: config.get('token_registry_port'), host: config.get('token_registry_host') }); var seneca = require('seneca')({ log: 'silent' }).client({ port: config.get('token_registry_port'), host: config.get('token_registry_host') });
var on_register_redis = require('./redis/on_register').on_register_redis
// https://github.com/isaacs/node-lru-cache // https://github.com/isaacs/node-lru-cache
var LRU = require("lru-cache"), var LRU = require("lru-cache"),
...@@ -44,45 +45,6 @@ function authCheck(client_id, token, password, callback) { ...@@ -44,45 +45,6 @@ function authCheck(client_id, token, password, callback) {
callback(false); callback(false);
} }
}); });
// seneca.act('cmd:getAccessTokenInfo, tokencode:'+token, function(err,res) {
// if (!err && res) {
// var token_profile = (res&&res.result&&res.result[0])?res.result[0]:{};
// var mqttauth = {
// clientid : client_id,
// token : token,
// password : password
// };
// callback( validator.auth_connect(mqttauth, token_profile) );
// }
// else {
// callback(false);
// }
// });
// authclient.act({ role: 'auth', cmd: 'token', action: 'info', token: token }, function (err, res) { // auth client device by query from db
// if (debug) {
// console.log("res ------>\n");
// console.log(res);
// }
// if (res && res.data) {
// try {
// var jdata = JSON.parse(res.data);
// if (jdata && jdata.code == 200) {
// callback(true);
// }
// else callback(false);
// } catch (e) {
// callback(false);
// }
// callback(true);
// }
// else {
// callback(false);
// }
// });
} }
} }
...@@ -101,57 +63,33 @@ module.exports = function (options = {}) { ...@@ -101,57 +63,33 @@ module.exports = function (options = {}) {
var cachekey = 'auth:' + req.body.client_id + ':' + req.body.username + ':' + req.body.password; var cachekey = 'auth:' + req.body.client_id + ':' + req.body.username + ':' + req.body.password;
var authstatus = cache.get(cachekey); var authstatus = cache.get(cachekey);
if (typeof (authstatus) == 'undefined') { if (typeof (authstatus) == 'undefined') {
// cache missed
authCheck(req.body.client_id, req.body.username, req.body.password, function (result) { authCheck(req.body.client_id, req.body.username, req.body.password, function (result) {
cache.set(cachekey, result); cache.set(cachekey, result);
if (result) { if (result) {
require('./redis/on_register').on_register_redis(req.body.client_id).then(status => { try {
console.log('redis:status: ' + status) on_register_redis(req.body.client_id)
if (status) { res.send({ result: 'ok' });
res.send({ }
result: 'ok' catch (e) {
}); res.send({ result: { error: 'not allowed' } });
} else { }
res.send({
result: {error: 'not allowed'}
});
}
})
} else { } else {
res.send({ res.send({ result: { error: 'not allowed' } });
result: {error: 'not allowed'}
});
} }
// const util = require('util')
// console.log(util.inspect(status, false, null, true))
}); });
} }
else { else {
if (authstatus) { if (authstatus) {
require('./redis/on_register').on_register_redis(req.body.client_id).then(status => { on_register_redis(req.body.client_id)
console.log('redis:status: ' + status) res.send({ result: 'ok' });
if (status) {
res.send({
result: 'ok'
});
} else {
res.send({
result: {error: 'not allowed'}
});
}
})
} else { } else {
res.send({ res.send({ result: { error: 'not allowed' } });
result: {error: 'not allowed'}
});
} }
} }
next(); next();
} }
else { else {
res.send({ res.send({ result: { error: 'not allowed' } });
result: 'no'
});
next(); next();
} }
} }
......
{ {
"authserv_host" : "203.151.51.4", "device_registry_host" : "alpha.nexpie.io",
"authserv_port" : 31081, "device_registry_port" : 8990,
"pubca": "-----BEGIN PUBLIC KEY-----\nMIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIICCgKCAgEA0oW2b1az7MLTRBQojy8e\nYTmVUcbrHBykyMZCkwt4OdL/4zD6jhOmE9JMr3NfRxEJ2BVj7gasEH2h/mpNRtSK\nHjKsgmFWt22QR/TNwNu7AHUiu+7pxyeOz6IDW9bzrptwwwvFz8wc0oKah6fs0cNY\n/Ln5QwY5CWpGJmHF5BOFE5R9PjC3bY2roKwDbKiw4Kx1rbRxwbSzvFGtBMVyK0Rv\nJpWEN6wTEesAI/bLkPth712zfY/PXFi9Vei/gcLxptceISj/o+PUZm4F6u+rHGGt\nOlcBI8lKiCBO8bggmGgpGCZ1fomj5pOPfInl/6Nn4X0gk/4s0MB7zuhBv45zxF8J\ntbzqQ9TVPdUewMtR5UWoPMGiGt6ZOYRAAmGYSWCsa7hD3plcg0NXYXVuLinyxmgd\nKUv0JmmDqWdkNme9W6m42Jb/zEACcneIIVgUeDkadYaJSjY05d2gn/Vn24f0hqp2\nSQ41fYBxw8eKUJLBNo2c9l+Vz7w/dln4cqI/hatXqOVvLml3CKBGwT9hFjufVTpr\nX5WgDzAxGAl0kO0IucInL/XROezsVcxhyJ2dp44KPOoSIUW8dFz7HNq4t9ENzcPV\nvYfvyS0l00p+ivaqT9iV53Ash6f+tHOR5arTNsuM3DHTi3Fi6qIhXSW5qGUl401c\nzSaVroFHIeQI0ZcTxzAOvS0CAwEAAQ==\n-----END PUBLIC KEY-----",
"device_registry_host" : "127.0.0.1", "token_registry_host" : "alpha.nexpie.io",
"device_registry_port" : 8080 "token_registry_port" : 8790,
"auth_on_register_debug" : true,
"auth_on_publish_debug" : true,
"auth_on_subscribe_debug" : true,
"on_publish_debug" : true,
"on_deliver_debug" : true,
"on_unsubscribe_debug" : true,
"on_offline_debug" : true
} }
...@@ -15,10 +15,10 @@ module.exports = function(options = {}) { ...@@ -15,10 +15,10 @@ module.exports = function(options = {}) {
console.log(response); console.log(response);
res.send(response) res.send(response)
} }
module.exports.on_deliver = on_deliver // module.exports.on_deliver = on_deliver
function joinTopic(topics) { // function joinTopic(topics) {
var topic = topics.split('/!')[1] // var topic = topics.split('/!')[1]
return topics.split('/!')[0] + topic.substring(topic.indexOf('/'), topic.length) // return topics.split('/!')[0] + topic.substring(topic.indexOf('/'), topic.length)
} // }
} }
var on_offline_redis = require('./redis/on_offline').on_offline_redis
module.exports = function (options = {}) { module.exports = function (options = {}) {
debug = options.debug || false; debug = options.debug || false;
return function (req, res, next) { return function (req, res, next) {
console.log('auth_on_offline') console.log('auth_on_offline')
doRedis(req.body.client_id) on_offline_redis(req.body.client_id)
res.status(200); res.status(200);
res.send(''); res.send('');
} }
function doRedis(client_id) {
if (client_id.indexOf('mqttjs') === -1) {
require('./redis/on_offline').on_offline_redis(client_id).then(status => {
console.log('redis:status: ' + status)
})
}
}
} }
\ No newline at end of file
...@@ -2,30 +2,37 @@ var Redis = require('ioredis') ...@@ -2,30 +2,37 @@ var Redis = require('ioredis')
var redis = new Redis() var redis = new Redis()
function on_message_redis(deviceid, payload, topic) { // first time to access on authhook auth_on_register function on_message_redis(deviceid, payload, topic) { // first time to access on authhook auth_on_register
return new Promise((resolve, reject) => { if (deviceid.indexOf('mqtt') === 0) {
if (deviceid.indexOf('mqtt') === 0 || deviceid.length !== 36) { console.log(deviceid)
console.log(deviceid) }
resolve(true) else {
} var information = setValue(deviceid, payload, topic)
else { console.log(information.total_message_size)
var information = setValue(deviceid, payload, topic) redis.hincrby(information.keys, 'actual_message_count', 1)
console.log(information.total_message_size) redis.hincrby(information.keys, 'charged_message_count', information.charged_message_count)
redis.hincrby(information.keys, 'actual_message_count', 1) redis.hincrby(information.keys, 'total_message_size', Math.floor(information.total_message_size))
redis.hincrby(information.keys, 'charged_message_count', information.charged_message_count)
redis.hincrby(information.keys, 'total_message_size', Math.floor(information.total_message_size)) redis.hincrby(setBrokerValue().keys, 'actual_message_count', 1)
redis.hincrby(setBrokerValue().keys, 'charged_message_count', information.charged_message_count)
redis.hincrby(setBrokerValue().keys, 'actual_message_count', 1) redis.hincrby(setBrokerValue().keys, 'total_message_size', Math.floor(information.total_message_size))
redis.hincrby(setBrokerValue().keys, 'charged_message_count', information.charged_message_count)
redis.hincrby(setBrokerValue().keys, 'total_message_size', Math.floor(information.total_message_size)) redis.hset(information.keys, 'last_publish', information.last_publish)
}
redis.hset(information.keys, 'last_publish', information.last_publish, function (err, res) {
resolve(!res) // 0 is ok, 1 is no => if ok is 0 then not 0 = 1
})
}
})
} }
module.exports.on_message_redis = on_message_redis module.exports.on_message_redis = on_message_redis
redis.on("error", (error) => {
console.log("Redis connection error", error);
});
redis.on('reconnecting', function reconnecting() {
console.log('Connection reestablished');
});
redis.on('connect', function connect() {
console.log('connecting');
});
function setValue(deviceid, payload, topic) { function setValue(deviceid, payload, topic) {
var dateNow = Math.floor(Date.now() / 1000) var dateNow = Math.floor(Date.now() / 1000)
var keys = 'deviceid:' + deviceid var keys = 'deviceid:' + deviceid
......
...@@ -2,24 +2,31 @@ var Redis = require('ioredis') ...@@ -2,24 +2,31 @@ var Redis = require('ioredis')
var redis = new Redis() var redis = new Redis()
function on_offline_redis(deviceid) { // first time to access on authhook auth_on_register function on_offline_redis(deviceid) { // first time to access on authhook auth_on_register
return new Promise((resolve, reject) => { if (deviceid.indexOf('mqtt') === 0) {
if (deviceid.indexOf('mqtt') === 0 || deviceid.length !== 36) { }
resolve(true) else {
} var information = setValue(deviceid)
else { var status = redis.hget(information.keys, 'status')
var information = setValue(deviceid) if (status !== false) {
redis.hset(information.keys, 'status', information.status, 'offline_on', information.offline_on, function (err, res) { redis.hset(information.keys, 'status', information.status, 'offline_on', information.offline_on)
console.log(res) deviceUsageCount()
if (res) {
deviceUsageCount()
resolve(!res) // 0 is ok, 1 is no => if ok is 0 then not 0 = 1
} else resolve(!res)
})
} }
}) }
} }
module.exports.on_offline_redis = on_offline_redis module.exports.on_offline_redis = on_offline_redis
redis.on("error", (error) => {
console.log("Redis connection error", error);
});
redis.on('reconnecting', function reconnecting() {
console.log('Connection reestablished');
});
redis.on('connect', function connect() {
console.log('connecting');
});
function setValue(deviceid) { function setValue(deviceid) {
var dateNow = Math.floor(Date.now() / 1000) var dateNow = Math.floor(Date.now() / 1000)
var keys = 'deviceid:' + deviceid var keys = 'deviceid:' + deviceid
......
...@@ -2,25 +2,32 @@ var Redis = require('ioredis') ...@@ -2,25 +2,32 @@ var Redis = require('ioredis')
var redis = new Redis() var redis = new Redis()
function on_register_redis(deviceid) { // first time to access on authhook auth_on_register function on_register_redis(deviceid) { // first time to access on authhook auth_on_register
return new Promise((resolve, reject) => { if (deviceid.indexOf('mqtt') === 0) {
if (deviceid.indexOf('mqtt') === 0 || deviceid.length !== 36) { }
resolve(true) else {
} var information = setValue(deviceid)
else { redis.hdel(information.keys, 'offline_on')
var information = setValue(deviceid) var status = redis.hget(information.keys, 'status')
redis.hdel(information.keys, 'offline_on') if (status !== true) {
redis.hset(information.keys, 'status', information.status, 'register_on', information.register_on, 'last_check', information.last_check, function (err, res) { redis.hset(information.keys, 'status', information.status, 'register_on', information.register_on, 'last_check', information.last_check)
console.log(res) deviceUsageCount()
if (!res) {
deviceUsageCount()
resolve(!res) // 0 is ok, 1 is no => if ok is 0 then not 0 = 1
} else resolve(!res)
})
} }
}) }
} }
module.exports.on_register_redis = on_register_redis module.exports.on_register_redis = on_register_redis
redis.on("error", (error) => {
console.log("Redis connection error", error);
});
redis.on('reconnecting', function reconnecting() {
console.log('Connection reestablished');
});
redis.on('connect', function connect() {
console.log('connecting');
});
function setValue(deviceid) { function setValue(deviceid) {
var dateNow = Math.floor(Date.now() / 1000) var dateNow = Math.floor(Date.now() / 1000)
var keys = 'deviceid:' + deviceid var keys = 'deviceid:' + deviceid
......
var Redis = require('ioredis')
var redis = new Redis()
redis.subscribe('clientStatus')
\ No newline at end of file
apiVersion: v1
kind: Service
metadata:
name: redis-service
spec:
ports:
- port: 6379
targetPort: 6379
selector:
"app": "nexpieio-vernemq-broker"
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment