Commit 32f408d7 by zCaesar

add redis atomic by lua;

parent 5f73e514
var config = require('config');
// https://github.com/isaacs/node-lru-cache
var on_message_redis = require('./redis/on_message').on_message_redis
var on_message_redis = require('./redis-atomic/on_message').on_message_redis
var LRU = require("lru-cache"),
cache = LRU({
max: 500,
......
......@@ -3,7 +3,7 @@ var validator = require('./validator');
var config = require('config');
// var seneca = require('seneca')({log: 'silent'}).client({ port: config.get('device_registry_port'), host: config.get('device_registry_host') });
var seneca = require('seneca')({ log: 'silent' }).client({ port: config.get('token_registry_port'), host: config.get('token_registry_host') });
var on_register_redis = require('./redis/on_register').on_register_redis
var on_register_redis = require('./redis-atomic/on_register').on_register_redis
// https://github.com/isaacs/node-lru-cache
var LRU = require("lru-cache"),
......
......@@ -2,7 +2,7 @@
"device_registry_host" : "alpha.nexpie.io",
"device_registry_port" : 8990,
"token_registry_host" : "localhost",
"token_registry_host" : "alpha.nexpie.io",
"token_registry_port" : 8790,
"auth_on_register_debug" : true,
......
var on_offline_redis = require('./redis/on_offline').on_offline_redis
var on_offline_redis = require('./redis-atomic/on_offline').on_offline_redis
module.exports = function (options = {}) {
......
let Redis = require('ioredis')
let redis = new Redis()
let fs = require('fs')
let path = require('path')
function on_message_redis(deviceid, payload, topic) { // first time to access on authhook auth_on_register
if (deviceid.indexOf('mqtt') === 0) {
console.log(deviceid)
}
else {
let information = setValue(deviceid, payload, topic)
let luafile = path.join(__dirname, '/on_message.lua')
let file = fs.readFileSync(luafile)
let luastring = Buffer.from(JSON.parse(JSON.stringify(file)).data).toString('utf8')
redis.defineCommand('on_message', {
numberOfKeys: 2,
lua: luastring
})
let device_keys = information.keys
let broker_keys = '_broker:1'
let last_publish = information.last_publish
let charged_message_count = information.charged_message_count
let total_message_size = information.total_message_size
redis.on_message(device_keys, broker_keys, last_publish, charged_message_count, total_message_size, (err, result) => {
if (err) console.log(err)
else console.log(result)
})
}
}
module.exports.on_message_redis = on_message_redis
redis.on("error", (error) => {
console.log("Redis connection error", error);
});
redis.on('reconnecting', function reconnecting() {
console.log('Connection reestablished');
});
redis.on('connect', function connect() {
console.log('connecting');
});
function setValue(deviceid, payload, topic) {
var dateNow = Math.floor(Date.now() / 1000)
var keys = 'deviceid:' + deviceid
var last_publish = dateNow
var total_message_size = (payload.length + topic.length) * (3 / 4)
var msg_count_size = total_message_size
var charged_message_count = 1
while (msg_count_size > 5000) {
charged_message_count += 1
msg_count_size = msg_count_size - 5000
}
return {
keys: keys,
last_publish: last_publish,
charged_message_count: charged_message_count,
total_message_size: total_message_size
}
}
\ No newline at end of file
local device_keys = KEYS[1]
local broker_keys = KEYS[2]
local charged_message_count = ARGV[1]
local total_message_size = ARGV[2]
local last_publish = ARGV[3]
redis.call('HINCRBY', device_keys, 'actual_message_count', 1)
redis.call('HINCRBY', device_keys, 'charged_message_count', charged_message_count)
redis.call('HINCRBY', device_keys, 'total_message_size', total_message_size)
redis.call('HINCRBY', broker_keys, 'actual_message_count', 1)
redis.call('HINCRBY', broker_keys, 'charged_message_count', charged_message_count)
redis.call('HINCRBY', broker_keys, 'total_message_size', total_message_size)
redis.call('HSET', device_keys, 'last_publish', last_publish)
return 'true'
\ No newline at end of file
var Redis = require('ioredis')
var redis = new Redis()
let fs = require('fs')
let path = require('path')
function on_offline_redis(deviceid) { // first time to access on authhook auth_on_register
if (deviceid.indexOf('mqtt') === 0) {
console.log(deviceid)
}
else {
let information = setValue(deviceid)
let luafile = path.join(__dirname, '/on_offline.lua')
let file = fs.readFileSync(luafile)
let luastring = Buffer.from(JSON.parse(JSON.stringify(file)).data).toString('utf8')
redis.defineCommand('on_offline', {
numberOfKeys: 2,
lua: luastring
})
let device_keys = information.keys
let broker_keys = '_broker:1'
let offline_on = information.offline_on
redis.on_offline(device_keys, broker_keys, offline_on, (err, result) => {
if (err) console.log(err)
else console.log(result)
})
}
}
module.exports.on_offline_redis = on_offline_redis
redis.on("error", (error) => {
console.log("Redis connection error", error);
});
redis.on('reconnecting', function reconnecting() {
console.log('Connection reestablished');
});
redis.on('connect', function connect() {
console.log('connecting');
});
function setValue(deviceid) {
var dateNow = Math.floor(Date.now() / 1000)
var keys = 'deviceid:' + deviceid
var offline_on = dateNow
return {
keys: keys,
offline_on: offline_on
}
}
\ No newline at end of file
local device_keys = KEYS[1]
local broker_keys = KEYS[2]
local offline_on = ARGV[1]
local last_status = redis.call('HGET', device_keys, 'status')
if last_status ~= "false" then
redis.call('HSET', device_keys, 'status', 'false', 'offline_on', offline_on)
redis.call('HINCRBY', broker_keys, 'device_online', -1)
return 'true'
else
return 'false'
end
\ No newline at end of file
var Redis = require('ioredis')
var redis = new Redis()
let fs = require('fs')
let path = require('path')
function on_register_redis(deviceid) { // first time to access on authhook auth_on_register
if (deviceid.indexOf('mqtt') === 0) {
console.log(deviceid)
}
else {
var information = setValue(deviceid)
let luafile = path.join(__dirname, '/on_register.lua')
let file = fs.readFileSync(luafile)
let luastring = Buffer.from(JSON.parse(JSON.stringify(file)).data).toString('utf8')
redis.defineCommand('on_register', {
numberOfKeys: 2,
lua: luastring
})
let device_keys = information.keys
let broker_keys = '_broker:1'
let register_on = information.register_on
let last_check = information.last_check
redis.on_register(device_keys, broker_keys, register_on, last_check, (err, result) => {
if (err) console.log(err)
else console.log(result)
})
}
}
module.exports.on_register_redis = on_register_redis
redis.on("error", (error) => {
console.log("Redis connection error", error);
});
redis.on('reconnecting', function reconnecting() {
console.log('Connection reestablished');
});
redis.on('connect', function connect() {
console.log('connecting');
});
function setValue(deviceid) {
var dateNow = Math.floor(Date.now() / 1000)
var keys = 'deviceid:' + deviceid
var register_on = dateNow
var last_check = dateNow
return {
keys: keys,
register_on: register_on,
last_check: last_check
}
}
\ No newline at end of file
local device_keys = KEYS[1]
local broker_keys = KEYS[2]
local register_on = ARGV[1]
local last_check = ARGV[2]
redis.call('HDEL', device_keys, 'offline_on')
local last_status = redis.call('HGET', device_keys, 'status')
if last_status ~= "true" then
redis.call('HSET', device_keys, 'status', 'true', 'register_on', register_on, 'last_check', last_check)
redis.call('HINCRBY', broker_keys, 'device_online', 1)
return 'true'
else
return 'false'
end
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment