|
@ -14,11 +14,12 @@ let ParticipantRepo = require('../../repository/mysql/participant.repo');
|
|
let configFile = require('../../include/commons').CONFIG_FILE;
|
|
let configFile = require('../../include/commons').CONFIG_FILE;
|
|
let config = require('../../resources/config/' + configFile);
|
|
let config = require('../../resources/config/' + configFile);
|
|
let redis = RedisClient.redisClient().connection;
|
|
let redis = RedisClient.redisClient().connection;
|
|
let log = require('../../util/log.js');
|
|
|
|
|
|
let logger = require('../../util/log.js');
|
|
let mongoose = require('mongoose');
|
|
let mongoose = require('mongoose');
|
|
|
|
|
|
const RedisKeys = require('../../include/commons').REDIS_KEYS;
|
|
|
|
const Commons = require('../../include/commons');
|
|
|
|
|
|
const REDIS_KEYS = require('../../include/commons').REDIS_KEYS;
|
|
|
|
const SESSION_TYPES = require('../../include/commons').SESSION_TYPES;
|
|
|
|
const STICKY_SESSION_BASE_SCORE = require('../../include/commons').STICKY_SESSION_BASE_SCORE;
|
|
|
|
|
|
class Sessions extends RedisModel {
|
|
class Sessions extends RedisModel {
|
|
constructor() {
|
|
constructor() {
|
|
@ -26,62 +27,78 @@ class Sessions extends RedisModel {
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
* 创建会话
|
|
|
|
|
|
* 创建会话。会话的ID来源:
|
|
|
|
* MUC:患者的ID
|
|
|
|
* P2P:对成员的ID排序后,取hash值
|
|
|
|
* GROUP:团队的ID
|
|
*
|
|
*
|
|
* type = 1 sessionid = md5(patientId); MUC
|
|
|
|
* type = 2 sessionId = hash(user1,user2); P2P
|
|
|
|
* type = 3 sessionId = groupid; 团队群聊
|
|
|
|
* @param sessionId 会话ID
|
|
|
|
|
|
* @param sessionId
|
|
* @param name 会话名称
|
|
* @param name 会话名称
|
|
* @param type 会话类型
|
|
* @param type 会话类型
|
|
* @param users 会话成员
|
|
|
|
|
|
* @param participantArray 会话成员
|
|
|
|
* @param handler 回调,仅MUC模式使用
|
|
*/
|
|
*/
|
|
createSession(sessionId, name, type, users,handler) {
|
|
|
|
|
|
createSession(sessionId, name, type, participantArray, handler) {
|
|
let self = this;
|
|
let self = this;
|
|
let _super = super.makeRedisKey;
|
|
|
|
users = eval("["+users+"]")[0];
|
|
|
|
if (type == config.sessionConfig.P2P) {//P2P消息用hash校验
|
|
|
|
var userArray=[];
|
|
|
|
for(var key in users){
|
|
|
|
userArray.push(key);
|
|
|
|
|
|
if (type == SESSION_TYPES.P2P) {
|
|
|
|
var participantIdArray = [];
|
|
|
|
for (let i in participantArray) {
|
|
|
|
participantIdArray.push(participantArray[i].split(":")[0]);
|
|
}
|
|
}
|
|
if(userArray.length>2){
|
|
|
|
ModelUtil.emitOK(self.eventEmitter, {"status": -1, "msg": "会话人数超过2个无法创建P2P会话!"});
|
|
|
|
|
|
|
|
|
|
if (participantIdArray.length != 2) {
|
|
|
|
ModelUtil.emitDataNotFound(self.eventEmitter, {message: "P2P session only allow 2 participants."});
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|
|
ParticipantRepo.findSessionIdByParticipantIds(userArray[0],userArray[0],function(err,res){
|
|
|
|
|
|
|
|
|
|
ParticipantRepo.findSessionIdByParticipantIds(participantIdArray[0], participantIdArray[1], function (err, res) {
|
|
sessionId = res;
|
|
sessionId = res;
|
|
callcreate(sessionId);
|
|
|
|
|
|
callCreate(sessionId);
|
|
});
|
|
});
|
|
}else{
|
|
|
|
callcreate();
|
|
|
|
|
|
} else {
|
|
|
|
callCreate(sessionId);
|
|
}
|
|
}
|
|
function callcreate(){
|
|
|
|
let createDate = new Date();
|
|
|
|
let session_key = _super(RedisKeys.Session, sessionId);
|
|
|
|
let participants = new Participants();
|
|
|
|
|
|
|
|
// 将session加入redis
|
|
|
|
participants.saveParticipantsToRedis(sessionId, users, createDate, function (res) {
|
|
|
|
if (!res) {
|
|
|
|
ModelUtil.emitOK(self.eventEmitter, {"status": -1, "msg": res});
|
|
|
|
} else {
|
|
|
|
let messages = {};
|
|
|
|
messages.senderId = "system";
|
|
|
|
messages.senderName = "系统消息";
|
|
|
|
messages.timestamp = createDate;
|
|
|
|
messages.content = "";
|
|
|
|
messages.contentType = "1";
|
|
|
|
self.updateLastContent(session_key, type, name, messages);
|
|
|
|
if(config.sessionConfig.MUC==type){
|
|
|
|
handler(true);
|
|
|
|
}else {
|
|
|
|
modelUtil.emitOK(self.eventEmitter, {"status": 200, "msg": "session create success!"});
|
|
|
|
}
|
|
|
|
self.saveSessionToMysql(sessionId, name, type, createDate);
|
|
|
|
participants.saveParticipantsToMysql(sessionId, users); //创建session成员到数据库
|
|
|
|
|
|
function callCreate(sessionId) {
|
|
|
|
SessionRepo.findOne(sessionId, function (err, res) {
|
|
|
|
if (res.length > 0) {
|
|
|
|
let session = res[0];
|
|
|
|
ModelUtil.emitOK(self.eventEmitter, {
|
|
|
|
id: session.id,
|
|
|
|
name: session.name,
|
|
|
|
type: session.type,
|
|
|
|
create_date: session.create_date
|
|
|
|
});
|
|
|
|
return;
|
|
}
|
|
}
|
|
})
|
|
|
|
|
|
|
|
|
|
let createDate = new Date();
|
|
|
|
let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
|
|
|
|
|
|
|
|
// 保存会话及成员至MySQL中
|
|
|
|
self.saveSessionToMysql(sessionId, name, type, createDate, function (err, res) {
|
|
|
|
Participants.saveParticipantsToMysql(sessionId, participantArray, function (err, res) {
|
|
|
|
// 保存会话及成员至Redis中,并更新会话的最后状态
|
|
|
|
let isMucSession = SESSION_TYPES.MUC == type;
|
|
|
|
let message = {
|
|
|
|
sender_id: "System",
|
|
|
|
sender_name: "System",
|
|
|
|
content_type: 1,
|
|
|
|
content: "",
|
|
|
|
timestamp: createDate
|
|
|
|
};
|
|
|
|
|
|
|
|
Messages.updateLastContent(sessionKey, type, name, message);
|
|
|
|
Participants.saveParticipantsToRedis(sessionId, participantArray, createDate, function (res) {
|
|
|
|
if (isMucSession) {
|
|
|
|
handler(true, sessionId);
|
|
|
|
} else {
|
|
|
|
ModelUtil.emitOK(self.eventEmitter, {id: sessionId});
|
|
|
|
}
|
|
|
|
});
|
|
|
|
});
|
|
|
|
});
|
|
|
|
});
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@ -91,9 +108,10 @@ class Sessions extends RedisModel {
|
|
* @param name
|
|
* @param name
|
|
* @param type
|
|
* @param type
|
|
* @param createDate
|
|
* @param createDate
|
|
|
|
* @param handler
|
|
*/
|
|
*/
|
|
saveSessionToMysql(sessionId, name, type, createDate) {
|
|
|
|
SessionRepo.saveSession(sessionId, name, type, createDate);
|
|
|
|
|
|
saveSessionToMysql(sessionId, name, type, createDate, handler) {
|
|
|
|
SessionRepo.saveSession(sessionId, name, type, createDate, handler);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@ -118,35 +136,36 @@ class Sessions extends RedisModel {
|
|
* 根据用户ID获取用户的session列表
|
|
* 根据用户ID获取用户的session列表
|
|
* @param userId
|
|
* @param userId
|
|
* @param page
|
|
* @param page
|
|
* @param pagesize
|
|
|
|
|
|
* @param size
|
|
*/
|
|
*/
|
|
getUserSessions(userId, page, pagesize) {
|
|
|
|
let user_session_key = super.makeRedisKey(RedisKeys.UserSessions, userId);
|
|
|
|
|
|
getUserSessions(userId, page, size) {
|
|
|
|
let userSessionKey = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, userId);
|
|
let self = this;
|
|
let self = this;
|
|
let _super = super.makeRedisKey;
|
|
|
|
if (page > 0) {
|
|
if (page > 0) {
|
|
page = page * pagesize;
|
|
|
|
pagesize = pagesize + page;
|
|
|
|
|
|
page = page * size;
|
|
|
|
size = size + page;
|
|
}
|
|
}
|
|
|
|
|
|
//倒序
|
|
|
|
redis.zrevrangeAsync(user_session_key, page, pagesize).then(function (res) {
|
|
|
|
let sessionlist = [];
|
|
|
|
|
|
// 倒序获取
|
|
|
|
redis.zrevrangeAsync(userSessionKey, page, size).then(function (res) {
|
|
|
|
let sessionList = [];
|
|
if (res.length == 0) {
|
|
if (res.length == 0) {
|
|
ModelUtil.emitOK(self.eventEmitter, {"status": 200, "data": res});
|
|
|
|
} else {
|
|
|
|
for (var j in res) {
|
|
|
|
calllist(res[j], j, res.length);
|
|
|
|
}
|
|
|
|
|
|
ModelUtil.emitOK(self.eventEmitter, []);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
for (let i in res) {
|
|
|
|
callGetSessions(res[i], i == res.length - 1);
|
|
}
|
|
}
|
|
function calllist(session, j, _len) {
|
|
|
|
let session_key = _super(RedisKeys.Session, session);
|
|
|
|
redis.hgetallAsync(session_key).then(function (res) {
|
|
|
|
let participants_key = _super(RedisKeys.Participants, session);
|
|
|
|
//当前用户最后一次登录改讨论组时间
|
|
|
|
redis.zscoreAsync(participants_key, userId).then(function (restimestamp) {
|
|
|
|
//时间差获取消息数量
|
|
|
|
callamount(res, j, _len, session, restimestamp);
|
|
|
|
|
|
|
|
|
|
function callGetSessions(sessionId, lastOne) {
|
|
|
|
let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
|
|
|
|
redis.hgetallAsync(sessionKey).then(function (session) {
|
|
|
|
let sessionParticipantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
|
|
|
|
|
|
|
|
// 对比当前用户最后一次此会话消息的时间与会话中最新的消息时间,以此判断未读消息数量
|
|
|
|
redis.zscoreAsync(sessionParticipantsKey, userId).then(function (lastFetchTime) {
|
|
|
|
callGetUnreadCount(session, sessionId, lastFetchTime, lastOne);
|
|
})
|
|
})
|
|
}).catch(function (err) {
|
|
}).catch(function (err) {
|
|
throw err;
|
|
throw err;
|
|
@ -154,51 +173,58 @@ class Sessions extends RedisModel {
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
* 消息统计
|
|
|
|
* @param res 返回的会话列表
|
|
|
|
* @param j 当前会话列表的位置
|
|
|
|
* @param _len 列表长度 用做返回前端操作
|
|
|
|
* @param session 当前会话
|
|
|
|
* @param restimestamp 当前会话当前用户的最后一次时间搓
|
|
|
|
|
|
* 统计未读消息数。以当前时间为准。
|
|
|
|
*
|
|
|
|
* @param session 返回的会话列表
|
|
|
|
* @param sessionId 当前会话ID
|
|
|
|
* @param lastFetchTime 当前会话当前用户的最后一次时间搓
|
|
|
|
* @param lastOne
|
|
*/
|
|
*/
|
|
function callamount(res, j, _len, session, restimestamp) {
|
|
|
|
let message_time_key = _super(RedisKeys.MessagesByTimestamp, session);
|
|
|
|
redis.zrangebyscoreAsync(message_time_key, restimestamp, (new Date().getTime())).then(function (messagetimelist) {
|
|
|
|
res.sessionId = session;
|
|
|
|
res.message = messagetimelist.length;
|
|
|
|
callrole(res, j, _len,session);
|
|
|
|
}).catch(function (err) {
|
|
|
|
throw err;
|
|
|
|
})
|
|
|
|
|
|
function callGetUnreadCount(session, sessionId, lastFetchTime, lastOne) {
|
|
|
|
let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
|
|
|
|
redis.zrangebyscoreAsync(messagesByTimestampKey, lastFetchTime, (new Date().getTime()))
|
|
|
|
.then(function (messagetimelist) {
|
|
|
|
session.id = sessionId;
|
|
|
|
session.unread_count = messagetimelist.length;
|
|
|
|
|
|
|
|
callGetMyRole(session, sessionId, lastOne);
|
|
|
|
})
|
|
|
|
.catch(function (err) {
|
|
|
|
throw err;
|
|
|
|
});
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
* 用户角色
|
|
|
|
* @param res要返回的JSON
|
|
|
|
* @param j 第N调数据
|
|
|
|
* @param _len 总数据长度
|
|
|
|
|
|
* 获取用户在此会话中的角色。
|
|
|
|
*
|
|
|
|
* @param session 要返回的JSON
|
|
|
|
* @param sessionId
|
|
|
|
* @param lastOne
|
|
*/
|
|
*/
|
|
function callrole(res, j, _len,session){
|
|
|
|
let participants_role_key = _super(RedisKeys.ParticipantsRole, session);
|
|
|
|
redis.hgetAsync(participants_role_key, userId).then(function(role){
|
|
|
|
res.role=role;
|
|
|
|
callback(res, j, _len);
|
|
|
|
|
|
function callGetMyRole(session, sessionId, lastOne) {
|
|
|
|
let participantsRoleKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipantsRole, sessionId);
|
|
|
|
redis.hgetAsync(participantsRoleKey, userId).then(function (role) {
|
|
|
|
session.my_role = role;
|
|
|
|
|
|
|
|
callback(session, lastOne);
|
|
})
|
|
})
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
* 列表封装完毕后由此回调返回数据界面
|
|
* 列表封装完毕后由此回调返回数据界面
|
|
* @param res
|
|
|
|
* @param j
|
|
|
|
* @param _len
|
|
|
|
|
|
*
|
|
|
|
* @param session
|
|
|
|
* @param lastOne
|
|
*/
|
|
*/
|
|
function callback(res, j, _len) {
|
|
|
|
sessionlist.push(res);
|
|
|
|
if (j == (_len - 1)) {
|
|
|
|
ModelUtil.emitOK(self.eventEmitter, {"status": 200, "data": sessionlist});
|
|
|
|
|
|
function callback(session, lastOne) {
|
|
|
|
sessionList.push(session);
|
|
|
|
|
|
|
|
if (lastOne) {
|
|
|
|
ModelUtil.emitOK(self.eventEmitter, sessionList);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}).catch(function (res) {
|
|
|
|
ModelUtil.emitOK(self.eventEmitter, "get list error " + res + ",user:" + userId);
|
|
|
|
|
|
}).catch(function (err) {
|
|
|
|
ModelUtil.emitError(self.eventEmitter, {message: "Get sessions failed: " + err});
|
|
})
|
|
})
|
|
}
|
|
}
|
|
|
|
|
|
@ -212,9 +238,9 @@ class Sessions extends RedisModel {
|
|
*/
|
|
*/
|
|
getMessages(sessionId, user, page, pagesize) {
|
|
getMessages(sessionId, user, page, pagesize) {
|
|
let self = this;
|
|
let self = this;
|
|
let message_timestamp_key = super.makeRedisKey(RedisKeys.MessagesByTimestamp, sessionId);
|
|
|
|
let message_key = super.makeRedisKey(RedisKeys.Messages, sessionId);
|
|
|
|
let participants_key = super.makeRedisKey(RedisKeys.Participants, sessionId);
|
|
|
|
|
|
let message_timestamp_key = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
|
|
|
|
let message_key = RedisModel.makeRedisKey(REDIS_KEYS.Messages, sessionId);
|
|
|
|
let participants_key = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
|
|
//超过最大限制后从mysql获取数据
|
|
//超过最大限制后从mysql获取数据
|
|
if (page * pagesize >= config.sessionConfig.maxMessageCount) {
|
|
if (page * pagesize >= config.sessionConfig.maxMessageCount) {
|
|
self.getMessageFromMySQL(sessionId, page, pagesize, function (err, res) {
|
|
self.getMessageFromMySQL(sessionId, page, pagesize, function (err, res) {
|
|
@ -262,119 +288,102 @@ class Sessions extends RedisModel {
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
* 更新会话最后一条消息
|
|
|
|
|
|
* 保存消息。
|
|
*
|
|
*
|
|
* @param session_key rediskey
|
|
|
|
* @param session_type
|
|
|
|
* @param name 议题名称
|
|
|
|
* @param message
|
|
|
|
* @returns {*}
|
|
|
|
*/
|
|
|
|
updateLastContent(session_key, session_type, name, message) {
|
|
|
|
return redis.hmsetAsync(session_key,
|
|
|
|
"create_date", message.timestamp,
|
|
|
|
"last_content", message.content,
|
|
|
|
"last_content_type", message.contentType,
|
|
|
|
"type", session_type,
|
|
|
|
"senderId", message.senderId,
|
|
|
|
"senderName", message.senderName,
|
|
|
|
"name", name
|
|
|
|
);
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* 保存消息
|
|
|
|
|
|
* 也可以根据议题保存消息,但最终还是保存到与会话对象。
|
|
|
|
*
|
|
|
|
* see also: saveMessageByTopic
|
|
*
|
|
*
|
|
* @param message
|
|
* @param message
|
|
* @param sessionId
|
|
* @param sessionId
|
|
*/
|
|
*/
|
|
saveMessageBySession(message, sessionId) {
|
|
|
|
|
|
saveMessageBySession(sessionId, message) {
|
|
let self = this;
|
|
let self = this;
|
|
let messages = new Messages();
|
|
let messages = new Messages();
|
|
let participants = new Participants();
|
|
let participants = new Participants();
|
|
let session_key = super.makeRedisKey(RedisKeys.Session, sessionId);
|
|
|
|
let message_id = mongoose.Types.ObjectId().toString();
|
|
|
|
let session_type = 0;
|
|
|
|
let name = "";
|
|
|
|
participants.existsParticipant(sessionId, message.senderId, function (res) {
|
|
|
|
//校验发送成员是都在讨论组
|
|
|
|
|
|
let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
|
|
|
|
let messageId = mongoose.Types.ObjectId().toString();
|
|
|
|
|
|
|
|
// 检查会话中是否存在此成员
|
|
|
|
participants.existsParticipant(sessionId, message.sender_id, function (err, res) {
|
|
|
|
if (err) {
|
|
|
|
ModelUtil.emitError(self.eventEmitter, "Check session paticipant failed: ", err);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
if (res) {
|
|
if (res) {
|
|
redis.hmgetAsync(session_key, ["type", "name"]).then(function (res) {
|
|
|
|
session_type = res[0];
|
|
|
|
name = res[1];
|
|
|
|
if (!session_type || !name) {
|
|
|
|
log.error("session is error for key " + session_key);
|
|
|
|
throw "session is not found";
|
|
|
|
}
|
|
|
|
}).then(function (res) {
|
|
|
|
//更新消息相关
|
|
|
|
return messages.saveMessageToRedis(message_id, sessionId, message);
|
|
|
|
}).then(function (res) {
|
|
|
|
//更新session的最后一条聊天记录
|
|
|
|
return self.updateLastContent(session_key, session_type, name, message);
|
|
|
|
|
|
redis.hmgetAsync(sessionKey, ["type", "name"]).then(function (res) {
|
|
|
|
let sessionType = res[0];
|
|
|
|
|
|
|
|
messages.saveMessageToRedis(sessionId, sessionType, messageId, message);
|
|
|
|
messages.saveMessageToMysql(sessionId, sessionType, messageId, message, function (err, res) {
|
|
|
|
if (err) {
|
|
|
|
ModelUtil.emitError(self.eventEmitter, {message: "Failed to save message to mysql: " + err});
|
|
|
|
} else {
|
|
|
|
ModelUtil.emitOK(self.eventEmitter, {count: 1, messages: [message]});
|
|
|
|
}
|
|
|
|
});
|
|
}).then(function (res) {
|
|
}).then(function (res) {
|
|
//操作mysql数据库
|
|
|
|
messages.saveMessageToMysql(message, session_type, message_id, sessionId);
|
|
|
|
//返回数据给前端。
|
|
|
|
ModelUtil.emitOK(self.eventEmitter, {"status": 200, "msg": "发送成功!"});
|
|
|
|
//消息推送
|
|
|
|
}).catch(function (res) {
|
|
|
|
ModelUtil.emitOK(self.eventEmitter, {"status": -1, "msg": res});
|
|
|
|
|
|
// TODO: 消息推送
|
|
|
|
}).catch(function (err) {
|
|
|
|
ModelUtil.emitError(self.eventEmitter, {message: "Error occurred while save message to session: " + err});
|
|
})
|
|
})
|
|
} else {
|
|
} else {
|
|
ModelUtil.emitOK(self.eventEmitter, {"status": -1, "msg": "用户不在此会话当中!"});
|
|
|
|
|
|
ModelUtil.emitDataNotFound(self.eventEmitter, {message: "当前会话找不到此发送者"});
|
|
}
|
|
}
|
|
})
|
|
|
|
|
|
});
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
* 保存消息
|
|
* 保存消息
|
|
*
|
|
*
|
|
* @param message
|
|
* @param message
|
|
* @param sessionId
|
|
* @param sessionId
|
|
*/
|
|
*/
|
|
saveMessageByTopic(message, sessionId,handler) {
|
|
|
|
|
|
saveMessageByTopic(message, sessionId, handler) {
|
|
let self = this;
|
|
let self = this;
|
|
let messages = new Messages();
|
|
let messages = new Messages();
|
|
let participants = new Participants();
|
|
let participants = new Participants();
|
|
let session_key = super.makeRedisKey(RedisKeys.Session, sessionId);
|
|
|
|
let message_id = mongoose.Types.ObjectId().toString();
|
|
|
|
let session_type = 0;
|
|
|
|
|
|
let session_key = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
|
|
|
|
let messageId = mongoose.Types.ObjectId().toString();
|
|
|
|
let sessionType = 0;
|
|
let name = "";
|
|
let name = "";
|
|
participants.existsParticipant(sessionId, message.senderId, function (err,res) {
|
|
|
|
|
|
participants.existsParticipant(sessionId, message.senderId, function (err, res) {
|
|
//校验发送成员是都在讨论组
|
|
//校验发送成员是都在讨论组
|
|
if (res) {
|
|
if (res) {
|
|
redis.hmgetAsync(session_key, ["type", "name"]).then(function (res) {
|
|
redis.hmgetAsync(session_key, ["type", "name"]).then(function (res) {
|
|
session_type = res[0];
|
|
|
|
|
|
sessionType = res[0];
|
|
name = res[1];
|
|
name = res[1];
|
|
if (!session_type || !name) {
|
|
|
|
log.error("session is error for key " + session_key);
|
|
|
|
|
|
if (!sessionType || !name) {
|
|
|
|
logger.error("session is error for key " + session_key);
|
|
throw "session is not found";
|
|
throw "session is not found";
|
|
}
|
|
}
|
|
}).then(function (res) {
|
|
}).then(function (res) {
|
|
//更新消息相关
|
|
//更新消息相关
|
|
return messages.saveMessageForRedis(message_id, sessionId, message);
|
|
|
|
|
|
return messages.saveMessageForRedis(messageId, sessionId, message);
|
|
}).then(function (res) {
|
|
}).then(function (res) {
|
|
//更新session的最后一条聊天记录
|
|
//更新session的最后一条聊天记录
|
|
return self.updateLastContent(session_key, session_type, name, message);
|
|
|
|
|
|
return Messages.updateLastContent(session_key, sessionType, name, message);
|
|
}).then(function (res) {
|
|
}).then(function (res) {
|
|
//操作mysql数据库
|
|
//操作mysql数据库
|
|
messages.saveMessageToMysql(message, session_type, message_id, sessionId);
|
|
|
|
|
|
messages.saveMessageToMysql(sessionId, sessionType, messageId, message);
|
|
//返回数据给前端。
|
|
//返回数据给前端。
|
|
handler(null,message_id)
|
|
|
|
|
|
handler(null, messageId)
|
|
//消息推送
|
|
//消息推送
|
|
}).catch(function (res) {
|
|
}).catch(function (res) {
|
|
handler(res,message_id)
|
|
|
|
|
|
handler(res, messageId)
|
|
})
|
|
})
|
|
} else {
|
|
} else {
|
|
handler( "用户不在此会话当中!",message_id);
|
|
|
|
|
|
handler("用户不在此会话当中!", messageId);
|
|
}
|
|
}
|
|
})
|
|
})
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
* 置顶操作
|
|
* 置顶操作
|
|
*/
|
|
*/
|
|
stickSession(sessionId, user) {
|
|
stickSession(sessionId, user) {
|
|
let user_session_key = super.makeRedisKey(RedisKeys.UserSessions, user);
|
|
|
|
|
|
let user_session_key = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, user);
|
|
let self = this;
|
|
let self = this;
|
|
//取出最大的session
|
|
//取出最大的session
|
|
redis.zrevrangeAsync(user_session_key, 0, 0).then(function (res) {
|
|
redis.zrevrangeAsync(user_session_key, 0, 0).then(function (res) {
|
|
@ -384,17 +393,17 @@ class Sessions extends RedisModel {
|
|
//当前时间搓比redis的时间搓更早证明没有置顶过
|
|
//当前时间搓比redis的时间搓更早证明没有置顶过
|
|
if (scoreres <= nowtime) {
|
|
if (scoreres <= nowtime) {
|
|
//初始化置顶
|
|
//初始化置顶
|
|
redis.zaddAsync(user_session_key, Commons.STICKY_SESSION_BASE_SCORE, sessionId).then(function (res) {
|
|
|
|
log.info("stickSession:" + sessionId + ",res:" + res);
|
|
|
|
|
|
redis.zaddAsync(user_session_key, STICKY_SESSION_BASE_SCORE, sessionId).then(function (res) {
|
|
|
|
logger.info("stickSession:" + sessionId + ",res:" + res);
|
|
ModelUtil.emitOK(self.eventEmitter, {"status": 200, "msg": "置顶成功!"});
|
|
ModelUtil.emitOK(self.eventEmitter, {"status": 200, "msg": "置顶成功!"});
|
|
}).then(function () {
|
|
}).then(function () {
|
|
SessionRepo.saveStickySession(sessionId, user, Commons.STICKY_SESSION_BASE_SCORE);
|
|
|
|
|
|
SessionRepo.saveStickySession(sessionId, user, STICKY_SESSION_BASE_SCORE);
|
|
})
|
|
})
|
|
} else {
|
|
} else {
|
|
//已有置顶的数据,取出来加1保存回去
|
|
//已有置顶的数据,取出来加1保存回去
|
|
scoreres = Number(scoreres) + 1;
|
|
scoreres = Number(scoreres) + 1;
|
|
redis.zaddAsync(user_session_key, scoreres, sessionId).then(function () {
|
|
redis.zaddAsync(user_session_key, scoreres, sessionId).then(function () {
|
|
log.info("stickSession:" + sessionId + ",res:" + res);
|
|
|
|
|
|
logger.info("stickSession:" + sessionId + ",res:" + res);
|
|
ModelUtil.emitOK(self.eventEmitter, {"status": 200, "msg": "置顶成功!"});
|
|
ModelUtil.emitOK(self.eventEmitter, {"status": 200, "msg": "置顶成功!"});
|
|
}).then(function () {
|
|
}).then(function () {
|
|
SessionRepo.saveStickySession(sessionId, user, scoreres);
|
|
SessionRepo.saveStickySession(sessionId, user, scoreres);
|
|
@ -408,15 +417,15 @@ class Sessions extends RedisModel {
|
|
* 取消会话置顶
|
|
* 取消会话置顶
|
|
*/
|
|
*/
|
|
cancelStickSession(sessionId, user) {
|
|
cancelStickSession(sessionId, user) {
|
|
let user_session_key = super.makeRedisKey(RedisKeys.UserSessions, user);
|
|
|
|
let participants_key = super.makeRedisKey(RedisKeys.Participants, sessionId);
|
|
|
|
|
|
let user_session_key = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, user);
|
|
|
|
let participants_key = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
|
|
let self = this;
|
|
let self = this;
|
|
redis.zscoreAsync(participants_key, user).then(function (res) {
|
|
redis.zscoreAsync(participants_key, user).then(function (res) {
|
|
if (!res) {
|
|
if (!res) {
|
|
res = new Date().getTime();
|
|
res = new Date().getTime();
|
|
}
|
|
}
|
|
redis.zaddAsync(user_session_key, res, sessionId).then(function (res) {
|
|
redis.zaddAsync(user_session_key, res, sessionId).then(function (res) {
|
|
log.info("cancelStickSession:" + sessionId);
|
|
|
|
|
|
logger.info("cancelStickSession:" + sessionId);
|
|
ModelUtil.emitOK(self.eventEmitter, {"status": 200, "msg": "取消置顶成功!"});
|
|
ModelUtil.emitOK(self.eventEmitter, {"status": 200, "msg": "取消置顶成功!"});
|
|
}).then(function () {
|
|
}).then(function () {
|
|
SessionRepo.unstickSession(sessionId, user);
|
|
SessionRepo.unstickSession(sessionId, user);
|