|
@ -8,11 +8,10 @@ let RedisModel = require('./../redis.model.js');
|
|
let ModelUtil = require('../../util/model.util');
|
|
let ModelUtil = require('../../util/model.util');
|
|
let Messages = require('../messages/messages');
|
|
let Messages = require('../messages/messages');
|
|
let Users = require('../user/users');
|
|
let Users = require('../user/users');
|
|
let Participants = require('./Participants');
|
|
|
|
|
|
let Participants = require('./participants');
|
|
let SessionRepo = require('../../repository/mysql/session.repo');
|
|
let SessionRepo = require('../../repository/mysql/session.repo');
|
|
let TopicRepo = require('../../repository/mysql/topics.repo');
|
|
let TopicRepo = require('../../repository/mysql/topics.repo');
|
|
let ParticipantRepo = require('../../repository/mysql/participant.repo');
|
|
let ParticipantRepo = require('../../repository/mysql/participant.repo');
|
|
let MessageRepo = require('../../repository/mysql/message.repo');
|
|
|
|
|
|
|
|
let WechatClient = require("../client/wechat.client.js");
|
|
let WechatClient = require("../client/wechat.client.js");
|
|
let AppClient = require("../client/app.client.js");
|
|
let AppClient = require("../client/app.client.js");
|
|
@ -28,7 +27,6 @@ let async = require("async");
|
|
const REDIS_KEYS = require('../../include/commons').REDIS_KEYS;
|
|
const REDIS_KEYS = require('../../include/commons').REDIS_KEYS;
|
|
const SESSION_TYPES = require('../../include/commons').SESSION_TYPES;
|
|
const SESSION_TYPES = require('../../include/commons').SESSION_TYPES;
|
|
const STICKY_SESSION_BASE_SCORE = require('../../include/commons').STICKY_SESSION_BASE_SCORE;
|
|
const STICKY_SESSION_BASE_SCORE = require('../../include/commons').STICKY_SESSION_BASE_SCORE;
|
|
const SESSION_BUSINESS_TYPE = require('../../include/commons').SESSION_BUSINESS_TYPE;
|
|
|
|
|
|
|
|
class Sessions extends RedisModel {
|
|
class Sessions extends RedisModel {
|
|
constructor() {
|
|
constructor() {
|
|
@ -51,22 +49,30 @@ class Sessions extends RedisModel {
|
|
let self = this;
|
|
let self = this;
|
|
let messageId = mongoose.Types.ObjectId().toString();
|
|
let messageId = mongoose.Types.ObjectId().toString();
|
|
//创建session到mysql
|
|
//创建session到mysql
|
|
self.createSessionToMysql(sessionId, name, type, participantArray,messageId,function(err,res){
|
|
|
|
if(err){
|
|
|
|
|
|
self.createSessionToMysql(sessionId, name, type, participantArray, messageId, function (err, res) {
|
|
|
|
if (err) {
|
|
logger.error(err);
|
|
logger.error(err);
|
|
}else{
|
|
|
|
logger.info("create session to mysql success :" +JSON.stringify(res));
|
|
|
|
|
|
} else {
|
|
|
|
logger.info("create session to mysql success :" + JSON.stringify(res));
|
|
}
|
|
}
|
|
});
|
|
});
|
|
|
|
|
|
//创建session到redis
|
|
//创建session到redis
|
|
self.createSessionToRedis(sessionId, name, type, participantArray,messageId, function(err,res){
|
|
|
|
if(err){
|
|
|
|
if(handler){handler(err,null);return;};
|
|
|
|
ModelUtil.emitError(self.eventEmitter, {message:err,status:-1}, null);
|
|
|
|
}else{
|
|
|
|
if(handler){handler(null,res);return;};
|
|
|
|
ModelUtil.emitOK(self.eventEmitter,{status:200,data:res});
|
|
|
|
|
|
self.createSessionToRedis(sessionId, name, type, participantArray, messageId, function (err, res) {
|
|
|
|
if (err) {
|
|
|
|
if (handler) {
|
|
|
|
handler(err, null);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
;
|
|
|
|
ModelUtil.emitError(self.eventEmitter, {message: err, status: -1}, null);
|
|
|
|
} else {
|
|
|
|
if (handler) {
|
|
|
|
handler(null, res);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
;
|
|
|
|
ModelUtil.emitOK(self.eventEmitter, {status: 200, data: res});
|
|
}
|
|
}
|
|
});
|
|
});
|
|
}
|
|
}
|
|
@ -81,20 +87,20 @@ class Sessions extends RedisModel {
|
|
* @param handler
|
|
* @param handler
|
|
* @returns {boolean}
|
|
* @returns {boolean}
|
|
*/
|
|
*/
|
|
createSessionToRedis(sessionId, name, type, participantArray,messageId, handler){
|
|
|
|
|
|
createSessionToRedis(sessionId, name, type, participantArray, messageId, handler) {
|
|
let self = this;
|
|
let self = this;
|
|
let messages = new Messages();
|
|
let messages = new Messages();
|
|
var participantIdArray = [];
|
|
|
|
|
|
let participantIdArray = [];
|
|
for (let i in participantArray) {
|
|
for (let i in participantArray) {
|
|
participantIdArray.push(participantArray[i].split(":")[0]);
|
|
participantIdArray.push(participantArray[i].split(":")[0]);
|
|
}
|
|
}
|
|
if (type == SESSION_TYPES.P2P||type==SESSION_TYPES.SYSTEM) {
|
|
|
|
if(sessionId){
|
|
|
|
|
|
if (type == SESSION_TYPES.P2P || type == SESSION_TYPES.SYSTEM) {
|
|
|
|
if (sessionId) {
|
|
callBusinessType(sessionId);
|
|
callBusinessType(sessionId);
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
if (participantIdArray.length != 2) {
|
|
if (participantIdArray.length != 2) {
|
|
handler("P2P session only allow 2 participants.",null);
|
|
|
|
|
|
handler("P2P session only allow 2 participants.", null);
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|
|
ParticipantRepo.findSessionIdByParticipantIds(participantIdArray[0], participantIdArray[1], function (err, res) {
|
|
ParticipantRepo.findSessionIdByParticipantIds(participantIdArray[0], participantIdArray[1], function (err, res) {
|
|
@ -102,15 +108,15 @@ class Sessions extends RedisModel {
|
|
callBusinessType(sessionId);
|
|
callBusinessType(sessionId);
|
|
});
|
|
});
|
|
} else {
|
|
} else {
|
|
if(!sessionId){
|
|
|
|
handler("MUC OR GROUP session sessionId is not allow null .",null);
|
|
|
|
|
|
if (!sessionId) {
|
|
|
|
handler("MUC OR GROUP session sessionId is not allow null .", null);
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
callBusinessType(sessionId);
|
|
callBusinessType(sessionId);
|
|
}
|
|
}
|
|
|
|
|
|
function callBusinessType(sessionId) {
|
|
function callBusinessType(sessionId) {
|
|
ParticipantRepo.getBusinessType(participantIdArray.join("','"),function(err,businessType){
|
|
|
|
|
|
ParticipantRepo.getBusinessType(participantIdArray.join("','"), function (err, businessType) {
|
|
callCreate(sessionId, businessType);
|
|
callCreate(sessionId, businessType);
|
|
});
|
|
});
|
|
}
|
|
}
|
|
@ -124,21 +130,21 @@ class Sessions extends RedisModel {
|
|
content_type: 11,
|
|
content_type: 11,
|
|
content: "会话创建成功",
|
|
content: "会话创建成功",
|
|
timestamp: createDate,
|
|
timestamp: createDate,
|
|
id:messageId
|
|
|
|
|
|
id: messageId
|
|
};
|
|
};
|
|
let session = {
|
|
let session = {
|
|
id :sessionId,
|
|
|
|
name:name,
|
|
|
|
type:type,
|
|
|
|
create_date:createDate.getTime(),
|
|
|
|
business_type:businessType,
|
|
|
|
last_sender_id : message.sender_id,
|
|
|
|
last_sender_name : message.sender_name,
|
|
|
|
|
|
id: sessionId,
|
|
|
|
name: name,
|
|
|
|
type: type,
|
|
|
|
create_date: createDate.getTime(),
|
|
|
|
business_type: businessType,
|
|
|
|
last_sender_id: message.sender_id,
|
|
|
|
last_sender_name: message.sender_name,
|
|
last_message_time: message.timestamp.getTime(),
|
|
last_message_time: message.timestamp.getTime(),
|
|
last_content : message.content,
|
|
|
|
last_content_type : message.content_type
|
|
|
|
|
|
last_content: message.content,
|
|
|
|
last_content_type: message.content_type
|
|
}
|
|
}
|
|
redis.hmsetAsync(sessionKey, session).then(function(){
|
|
|
|
|
|
redis.hmsetAsync(sessionKey, session).then(function () {
|
|
Participants.saveParticipantsToRedis(sessionId, participantArray, createDate, function (res) {
|
|
Participants.saveParticipantsToRedis(sessionId, participantArray, createDate, function (res) {
|
|
handler(null, session);
|
|
handler(null, session);
|
|
//messages.saveMessageToRedisFromCreateSession(sessionId, messageId, message);
|
|
//messages.saveMessageToRedisFromCreateSession(sessionId, messageId, message);
|
|
@ -156,7 +162,7 @@ class Sessions extends RedisModel {
|
|
* @param messageId
|
|
* @param messageId
|
|
* @param handler
|
|
* @param handler
|
|
*/
|
|
*/
|
|
createSessionToMysql(sessionId, name, type, participantArray,messageId, handler){
|
|
|
|
|
|
createSessionToMysql(sessionId, name, type, participantArray, messageId, handler) {
|
|
let self = this;
|
|
let self = this;
|
|
//如果sessionId不存在则执行创建sessionId过程
|
|
//如果sessionId不存在则执行创建sessionId过程
|
|
let participantIdArray = [];
|
|
let participantIdArray = [];
|
|
@ -164,46 +170,53 @@ class Sessions extends RedisModel {
|
|
participantIdArray.push(participantArray[i].split(":")[0]);
|
|
participantIdArray.push(participantArray[i].split(":")[0]);
|
|
}
|
|
}
|
|
//流程1-判断是否存在sessionId不存在则创建对应的sessionId;
|
|
//流程1-判断是否存在sessionId不存在则创建对应的sessionId;
|
|
if(!sessionId){
|
|
|
|
if (type == SESSION_TYPES.P2P||type==SESSION_TYPES.SYSTEM) {
|
|
|
|
|
|
if (!sessionId) {
|
|
|
|
if (type == SESSION_TYPES.P2P || type == SESSION_TYPES.SYSTEM) {
|
|
ParticipantRepo.findSessionIdByParticipantIds(participantIdArray[0], participantIdArray[1], function (err, res) {
|
|
ParticipantRepo.findSessionIdByParticipantIds(participantIdArray[0], participantIdArray[1], function (err, res) {
|
|
sessionId = res;
|
|
sessionId = res;
|
|
callBusinessType();
|
|
callBusinessType();
|
|
});
|
|
});
|
|
}else{
|
|
|
|
return handler("MUC模式和团队模式,不允许sessionId为空!",null);
|
|
|
|
|
|
} else {
|
|
|
|
return handler("MUC模式和团队模式,不允许sessionId为空!", null);
|
|
}
|
|
}
|
|
}else{
|
|
|
|
|
|
} else {
|
|
callBusinessType();
|
|
callBusinessType();
|
|
}
|
|
}
|
|
|
|
|
|
//流程2-判断session的业务类型;
|
|
//流程2-判断session的业务类型;
|
|
function callBusinessType(){
|
|
|
|
ParticipantRepo.getBusinessType(participantIdArray.join("','"),function(err,businessType){
|
|
|
|
if(err){handler(err,null);return;}
|
|
|
|
|
|
function callBusinessType() {
|
|
|
|
ParticipantRepo.getBusinessType(participantIdArray.join("','"), function (err, businessType) {
|
|
|
|
if (err) {
|
|
|
|
handler(err, null);
|
|
|
|
return;
|
|
|
|
}
|
|
callCreateSession(businessType);
|
|
callCreateSession(businessType);
|
|
});
|
|
});
|
|
}
|
|
}
|
|
|
|
|
|
//流程3-发起session创建 返回session实例
|
|
//流程3-发起session创建 返回session实例
|
|
function callCreateSession(businessType){
|
|
|
|
|
|
function callCreateSession(businessType) {
|
|
//查找该sessionId是否存在存在则直接返回实例
|
|
//查找该sessionId是否存在存在则直接返回实例
|
|
SessionRepo.findOne(sessionId, function (err, res) {
|
|
SessionRepo.findOne(sessionId, function (err, res) {
|
|
if (res.length > 0) {//已经存在
|
|
if (res.length > 0) {//已经存在
|
|
handler(null,res[0]);
|
|
|
|
}else{
|
|
|
|
|
|
handler(null, res[0]);
|
|
|
|
} else {
|
|
let createDate = new Date();
|
|
let createDate = new Date();
|
|
let session ={
|
|
|
|
id :sessionId,
|
|
|
|
name:name,
|
|
|
|
type:type,
|
|
|
|
create_date:createDate.getTime(),
|
|
|
|
business_type:businessType
|
|
|
|
|
|
let session = {
|
|
|
|
id: sessionId,
|
|
|
|
name: name,
|
|
|
|
type: type,
|
|
|
|
create_date: createDate.getTime(),
|
|
|
|
business_type: businessType
|
|
};
|
|
};
|
|
|
|
|
|
//将session写入数据库
|
|
//将session写入数据库
|
|
self.saveSessionToMysql(sessionId, name, type, createDate, businessType, function (err, res) {
|
|
self.saveSessionToMysql(sessionId, name, type, createDate, businessType, function (err, res) {
|
|
if(err){handler(err,null);return;};
|
|
|
|
|
|
if (err) {
|
|
|
|
handler(err, null);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
;
|
|
callCreateParticipants(session);
|
|
callCreateParticipants(session);
|
|
})
|
|
})
|
|
|
|
|
|
@ -212,9 +225,13 @@ class Sessions extends RedisModel {
|
|
}
|
|
}
|
|
|
|
|
|
//流程4-发起session成员创建
|
|
//流程4-发起session成员创建
|
|
function callCreateParticipants(session){
|
|
|
|
|
|
function callCreateParticipants(session) {
|
|
Participants.saveParticipantsToMysql(sessionId, participantArray, function (err, res) {
|
|
Participants.saveParticipantsToMysql(sessionId, participantArray, function (err, res) {
|
|
if(err){handler(err,null);return;};
|
|
|
|
|
|
if (err) {
|
|
|
|
handler(err, null);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
;
|
|
callBeginTrans(session);
|
|
callBeginTrans(session);
|
|
})
|
|
})
|
|
}
|
|
}
|
|
@ -228,13 +245,13 @@ class Sessions extends RedisModel {
|
|
content_type: 6,
|
|
content_type: 6,
|
|
content: "会话创建成功",
|
|
content: "会话创建成功",
|
|
timestamp: mesDate,
|
|
timestamp: mesDate,
|
|
id:messageId
|
|
|
|
|
|
id: messageId
|
|
};
|
|
};
|
|
|
|
|
|
session.last_sender_id = message.sender_id;
|
|
session.last_sender_id = message.sender_id;
|
|
session.last_sender_name = message.sender_name;
|
|
session.last_sender_name = message.sender_name;
|
|
session.last_message_time = mesDate.getTime();
|
|
session.last_message_time = mesDate.getTime();
|
|
session.last_content = message.content;
|
|
|
|
|
|
session.last_content = message.content;
|
|
session.last_content_type = message.content_type;
|
|
session.last_content_type = message.content_type;
|
|
|
|
|
|
SessionRepo.updateSessionLastStatus(message.sender_id,
|
|
SessionRepo.updateSessionLastStatus(message.sender_id,
|
|
@ -243,9 +260,13 @@ class Sessions extends RedisModel {
|
|
message.content,
|
|
message.content,
|
|
message.content_type,
|
|
message.content_type,
|
|
sessionId, function (err, res) {
|
|
sessionId, function (err, res) {
|
|
if (err) {handler(err,null);return;};
|
|
|
|
handler(null,session);
|
|
|
|
});
|
|
|
|
|
|
if (err) {
|
|
|
|
handler(err, null);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
;
|
|
|
|
handler(null, session);
|
|
|
|
});
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@ -255,10 +276,10 @@ class Sessions extends RedisModel {
|
|
* @param userId
|
|
* @param userId
|
|
* @param dateSpan
|
|
* @param dateSpan
|
|
*/
|
|
*/
|
|
getRecentSessions(userId, dateSpan){
|
|
|
|
|
|
getRecentSessions(userId, dateSpan) {
|
|
let self = this;
|
|
let self = this;
|
|
SessionRepo.findAllByTimestampAndType(userId, dateSpan, function (err, res) {
|
|
SessionRepo.findAllByTimestampAndType(userId, dateSpan, function (err, res) {
|
|
if(err){
|
|
|
|
|
|
if (err) {
|
|
ModelUtil.emitError(self.eventEmitter, "Get recent sessions failed", err);
|
|
ModelUtil.emitError(self.eventEmitter, "Get recent sessions failed", err);
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
@ -344,70 +365,68 @@ class Sessions extends RedisModel {
|
|
let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
|
|
let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
|
|
let participantsRoleKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipantsRole, sessionId);
|
|
let participantsRoleKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipantsRole, sessionId);
|
|
let sessionParticipantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
|
|
let sessionParticipantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
|
|
let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants,sessionId);
|
|
|
|
|
|
let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
|
|
redis.multi()
|
|
redis.multi()
|
|
.hgetall(sessionKey) // 会话实体
|
|
.hgetall(sessionKey) // 会话实体
|
|
.hget(participantsRoleKey, userId) // 用户在此会话中的角色
|
|
.hget(participantsRoleKey, userId) // 用户在此会话中的角色
|
|
.zscore(sessionParticipantsKey, userId) // 用户在此会话中最后一次获取未读消息的时间
|
|
.zscore(sessionParticipantsKey, userId) // 用户在此会话中最后一次获取未读消息的时间
|
|
.zrange(participantsKey,0,-1)
|
|
|
|
|
|
.zrange(participantsKey, 0, -1)
|
|
.execAsync()
|
|
.execAsync()
|
|
.then(function (res) {
|
|
.then(function (res) {
|
|
let session = res[0];
|
|
let session = res[0];
|
|
let role = res[1];
|
|
let role = res[1];
|
|
let lastFetchTime = res[2];
|
|
let lastFetchTime = res[2];
|
|
let users = res[3];
|
|
let users = res[3];
|
|
let sessionName="";
|
|
|
|
let otheruserId ="";
|
|
|
|
if(session.type==SESSION_TYPES.P2P){
|
|
|
|
for(var j in users){
|
|
|
|
if(users[j]!=userId){
|
|
|
|
|
|
let sessionName = "";
|
|
|
|
let otheruserId = "";
|
|
|
|
if (session.type == SESSION_TYPES.P2P) {
|
|
|
|
for (let j in users) {
|
|
|
|
if (users[j] != userId) {
|
|
otheruserId = users[j];
|
|
otheruserId = users[j];
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if(!role)role =0;
|
|
|
|
if(!lastFetchTime)lastFetchTime=new Date().getTime();
|
|
|
|
// 计算未读消息数
|
|
|
|
let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
|
|
|
|
redis.zcountAsync(messagesByTimestampKey, lastFetchTime, new Date().getTime())
|
|
|
|
.then(function (count) {
|
|
|
|
if(!otheruserId)otheruserId=userId;
|
|
|
|
ParticipantRepo.findNameById(otheruserId, function (err, res) {
|
|
|
|
if((res&&res.length==0)||session.type!=SESSION_TYPES.P2P){
|
|
|
|
sessionName = session.name;
|
|
|
|
}else{
|
|
|
|
sessionName = res[0].name;
|
|
|
|
}
|
|
|
|
sessionList.push({
|
|
|
|
id: sessionId,
|
|
|
|
name: sessionName,
|
|
|
|
create_date: session.create_date,
|
|
|
|
last_content_type: session.last_content_type,
|
|
|
|
last_content: session.last_content,
|
|
|
|
sender_id: session.sender_id,
|
|
|
|
type: session.type,
|
|
|
|
sender_name: session.sender_name,
|
|
|
|
unread_count: count,
|
|
|
|
business_type: session.business_type,
|
|
|
|
my_role: role
|
|
|
|
});
|
|
|
|
if (sessionId === sessionIds[sessionIds.length - 1]) {
|
|
|
|
ModelUtil.emitOK(self.eventEmitter, sessionList);
|
|
|
|
}
|
|
|
|
})
|
|
|
|
|
|
if (!role) role = 0;
|
|
|
|
if (!lastFetchTime) lastFetchTime = new Date().getTime();
|
|
|
|
// 计算未读消息数
|
|
|
|
let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
|
|
|
|
redis.zcountAsync(messagesByTimestampKey, lastFetchTime, new Date().getTime())
|
|
|
|
.then(function (count) {
|
|
|
|
if (!otheruserId) otheruserId = userId;
|
|
|
|
ParticipantRepo.findNameById(otheruserId, function (err, res) {
|
|
|
|
if ((res && res.length == 0) || session.type != SESSION_TYPES.P2P) {
|
|
|
|
sessionName = session.name;
|
|
|
|
} else {
|
|
|
|
sessionName = res[0].name;
|
|
|
|
}
|
|
|
|
sessionList.push({
|
|
|
|
id: sessionId,
|
|
|
|
name: sessionName,
|
|
|
|
create_date: session.create_date,
|
|
|
|
last_content_type: session.last_content_type,
|
|
|
|
last_content: session.last_content,
|
|
|
|
sender_id: session.sender_id,
|
|
|
|
type: session.type,
|
|
|
|
sender_name: session.sender_name,
|
|
|
|
unread_count: count,
|
|
|
|
business_type: session.business_type,
|
|
|
|
my_role: role
|
|
|
|
});
|
|
|
|
if (sessionId === sessionIds[sessionIds.length - 1]) {
|
|
|
|
ModelUtil.emitOK(self.eventEmitter, sessionList);
|
|
|
|
}
|
|
})
|
|
})
|
|
}).catch(function (err) {
|
|
|
|
logger.error("Get sessions failed: ", ex);
|
|
|
|
ModelUtil.emitError(self.eventEmitter, "Get sessions failed: " + err);
|
|
|
|
});
|
|
|
|
|
|
})
|
|
|
|
}).catch(function (err) {
|
|
|
|
logger.error("Get sessions failed: ", ex);
|
|
|
|
ModelUtil.emitError(self.eventEmitter, "Get sessions failed: " + err);
|
|
|
|
});
|
|
});
|
|
});
|
|
}
|
|
}
|
|
]);
|
|
]);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
* 获取会话消息。全部,不管已读/未读状态。
|
|
* 获取会话消息。全部,不管已读/未读状态。
|
|
*
|
|
*
|
|
@ -418,14 +437,14 @@ class Sessions extends RedisModel {
|
|
* @param start_msg_id 消息会话最新的一条消息的ID
|
|
* @param start_msg_id 消息会话最新的一条消息的ID
|
|
* @param end_msg_id 消息会话刚开始的消息ID
|
|
* @param end_msg_id 消息会话刚开始的消息ID
|
|
*/
|
|
*/
|
|
getMessages(sessionId, user, start_msg_id, end_msg_id, page, pagesize, isoffset,handler) {
|
|
|
|
|
|
getMessages(sessionId, user, start_msg_id, end_msg_id, page, pagesize, isoffset, handler) {
|
|
let self = this;
|
|
let self = this;
|
|
let message_timestamp_key = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
|
|
let message_timestamp_key = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
|
|
if (!start_msg_id && !end_msg_id) {
|
|
if (!start_msg_id && !end_msg_id) {
|
|
redis.zrevrangeAsync(message_timestamp_key, 0, 0).then(function (res) {
|
|
redis.zrevrangeAsync(message_timestamp_key, 0, 0).then(function (res) {
|
|
if (res.length == 0) {
|
|
if (res.length == 0) {
|
|
if(handler){
|
|
|
|
handler(null,res);
|
|
|
|
|
|
if (handler) {
|
|
|
|
handler(null, res);
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
ModelUtil.emitOK(self.eventEmitter, res);
|
|
ModelUtil.emitOK(self.eventEmitter, res);
|
|
@ -434,8 +453,8 @@ class Sessions extends RedisModel {
|
|
start_msg_id = res[0];
|
|
start_msg_id = res[0];
|
|
redis.zrangeAsync(message_timestamp_key, 0, 0).then(function (res) {
|
|
redis.zrangeAsync(message_timestamp_key, 0, 0).then(function (res) {
|
|
if (res.length == 0) {
|
|
if (res.length == 0) {
|
|
if(handler){
|
|
|
|
handler(null,res);
|
|
|
|
|
|
if (handler) {
|
|
|
|
handler(null, res);
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
ModelUtil.emitOK(self.eventEmitter, res);
|
|
ModelUtil.emitOK(self.eventEmitter, res);
|
|
@ -444,15 +463,15 @@ class Sessions extends RedisModel {
|
|
end_msg_id = res[0];
|
|
end_msg_id = res[0];
|
|
self.getMessagesByPage(sessionId, user, end_msg_id, start_msg_id, page, pagesize, isoffset, function (err, res) {
|
|
self.getMessagesByPage(sessionId, user, end_msg_id, start_msg_id, page, pagesize, isoffset, function (err, res) {
|
|
if (err) {
|
|
if (err) {
|
|
if(handler){
|
|
|
|
handler(err,null);
|
|
|
|
|
|
if (handler) {
|
|
|
|
handler(err, null);
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
logger.error("getMessagesByPage error" + err);
|
|
logger.error("getMessagesByPage error" + err);
|
|
ModelUtil.emitError(self.eventEmitter, err, err);
|
|
ModelUtil.emitError(self.eventEmitter, err, err);
|
|
} else {
|
|
} else {
|
|
if(handler){
|
|
|
|
handler(null,res);
|
|
|
|
|
|
if (handler) {
|
|
|
|
handler(null, res);
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
ModelUtil.emitOK(self.eventEmitter, res);
|
|
ModelUtil.emitOK(self.eventEmitter, res);
|
|
@ -463,8 +482,8 @@ class Sessions extends RedisModel {
|
|
} else if (!start_msg_id) {
|
|
} else if (!start_msg_id) {
|
|
redis.zrevrangeAsync(message_timestamp_key, 0, 0).then(function (res) {
|
|
redis.zrevrangeAsync(message_timestamp_key, 0, 0).then(function (res) {
|
|
if (res.length == 0) {
|
|
if (res.length == 0) {
|
|
if(handler){
|
|
|
|
handler(null,res);
|
|
|
|
|
|
if (handler) {
|
|
|
|
handler(null, res);
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
ModelUtil.emitOK(self.eventEmitter, res);
|
|
ModelUtil.emitOK(self.eventEmitter, res);
|
|
@ -473,15 +492,15 @@ class Sessions extends RedisModel {
|
|
start_msg_id = res[0];
|
|
start_msg_id = res[0];
|
|
self.getMessagesByPage(sessionId, user, end_msg_id, start_msg_id, page, pagesize, isoffset, function (err, res) {
|
|
self.getMessagesByPage(sessionId, user, end_msg_id, start_msg_id, page, pagesize, isoffset, function (err, res) {
|
|
if (err) {
|
|
if (err) {
|
|
if(handler){
|
|
|
|
handler(err,null);
|
|
|
|
|
|
if (handler) {
|
|
|
|
handler(err, null);
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
logger.error("getMessagesByPage error" + err);
|
|
logger.error("getMessagesByPage error" + err);
|
|
ModelUtil.emitError(self.eventEmitter, err, err);
|
|
ModelUtil.emitError(self.eventEmitter, err, err);
|
|
} else {
|
|
} else {
|
|
if(handler){
|
|
|
|
handler(null,res);
|
|
|
|
|
|
if (handler) {
|
|
|
|
handler(null, res);
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
ModelUtil.emitOK(self.eventEmitter, res);
|
|
ModelUtil.emitOK(self.eventEmitter, res);
|
|
@ -497,15 +516,15 @@ class Sessions extends RedisModel {
|
|
end_msg_id = res[0];
|
|
end_msg_id = res[0];
|
|
self.getMessagesByPage(sessionId, user, start_msg_id, end_msg_id, page, pagesize, isoffset, function (err, res) {
|
|
self.getMessagesByPage(sessionId, user, start_msg_id, end_msg_id, page, pagesize, isoffset, function (err, res) {
|
|
if (err) {
|
|
if (err) {
|
|
if(handler){
|
|
|
|
handler(err,null);
|
|
|
|
|
|
if (handler) {
|
|
|
|
handler(err, null);
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
logger.error("getMessagesByPage error" + err);
|
|
logger.error("getMessagesByPage error" + err);
|
|
ModelUtil.emitError(self.eventEmitter, err, err);
|
|
ModelUtil.emitError(self.eventEmitter, err, err);
|
|
} else {
|
|
} else {
|
|
if(handler){
|
|
|
|
handler(null,res);
|
|
|
|
|
|
if (handler) {
|
|
|
|
handler(null, res);
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
ModelUtil.emitOK(self.eventEmitter, res);
|
|
ModelUtil.emitOK(self.eventEmitter, res);
|
|
@ -515,15 +534,15 @@ class Sessions extends RedisModel {
|
|
} else {
|
|
} else {
|
|
self.getMessagesByPage(sessionId, user, end_msg_id, start_msg_id, page, pagesize, isoffset, function (err, res) {
|
|
self.getMessagesByPage(sessionId, user, end_msg_id, start_msg_id, page, pagesize, isoffset, function (err, res) {
|
|
if (err) {
|
|
if (err) {
|
|
if(handler){
|
|
|
|
handler(err,null);
|
|
|
|
|
|
if (handler) {
|
|
|
|
handler(err, null);
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
logger.error("getMessagesByPage error" + err);
|
|
logger.error("getMessagesByPage error" + err);
|
|
ModelUtil.emitError(self.eventEmitter, err, err);
|
|
ModelUtil.emitError(self.eventEmitter, err, err);
|
|
} else {
|
|
} else {
|
|
if(handler){
|
|
|
|
handler(null,res);
|
|
|
|
|
|
if (handler) {
|
|
|
|
handler(null, res);
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
ModelUtil.emitOK(self.eventEmitter, res);
|
|
ModelUtil.emitOK(self.eventEmitter, res);
|
|
@ -602,33 +621,37 @@ class Sessions extends RedisModel {
|
|
getAllSessionsUnreadMessageCount(userId) {
|
|
getAllSessionsUnreadMessageCount(userId) {
|
|
let self = this;
|
|
let self = this;
|
|
let count = 0;
|
|
let count = 0;
|
|
SessionRepo.findAll(userId,function(err,res){
|
|
|
|
if(err){
|
|
|
|
logger.err("getAllSessionsUnreadMessageCount is fail :"+err);
|
|
|
|
|
|
SessionRepo.findAll(userId, function (err, res) {
|
|
|
|
if (err) {
|
|
|
|
ModelUtil.logError("getAllSessionsUnreadMessageCount is failed", err);
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
if(res.length==0){
|
|
|
|
|
|
|
|
|
|
if (res.length == 0) {
|
|
ModelUtil.emitOK(self.eventEmitter, {count: count});
|
|
ModelUtil.emitOK(self.eventEmitter, {count: count});
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
for(var j in res){
|
|
|
|
if(res[j].type==SESSION_TYPES.SYSTEM){
|
|
|
|
if(j==res.length-1){
|
|
|
|
|
|
|
|
|
|
for (let j in res) {
|
|
|
|
if (res[j].type == SESSION_TYPES.SYSTEM) {
|
|
|
|
if (j == res.length - 1) {
|
|
ModelUtil.emitOK(self.eventEmitter, {count: count});
|
|
ModelUtil.emitOK(self.eventEmitter, {count: count});
|
|
}
|
|
}
|
|
|
|
|
|
continue;
|
|
continue;
|
|
}
|
|
}
|
|
callback(res,j,res[j]);
|
|
|
|
|
|
callback(res, j, res[j]);
|
|
}
|
|
}
|
|
})
|
|
|
|
|
|
});
|
|
|
|
|
|
function callback(res,j,session){
|
|
|
|
self.getSessionUnreadMessageCount(res[j].id,userId,function(err,con){
|
|
|
|
if(err){
|
|
|
|
logger.err("getAllSessionsUnreadMessageCount is fail :"+err);
|
|
|
|
|
|
function callback(res, j, session) {
|
|
|
|
self.getSessionUnreadMessageCount(res[j].id, userId, function (err, con) {
|
|
|
|
if (err) {
|
|
|
|
ModelUtil.logError("getAllSessionsUnreadMessageCount is failed", err);
|
|
}
|
|
}
|
|
count = count+con;
|
|
|
|
if(j==res.length-1){
|
|
|
|
|
|
|
|
|
|
count = count + con;
|
|
|
|
if (j == res.length - 1) {
|
|
ModelUtil.emitOK(self.eventEmitter, {count: count});
|
|
ModelUtil.emitOK(self.eventEmitter, {count: count});
|
|
}
|
|
}
|
|
})
|
|
})
|
|
@ -641,7 +664,7 @@ class Sessions extends RedisModel {
|
|
* @param sessionId
|
|
* @param sessionId
|
|
* @param userId
|
|
* @param userId
|
|
*/
|
|
*/
|
|
getSessionUnreadMessageCount(sessionId, userId,handler) {
|
|
|
|
|
|
getSessionUnreadMessageCount(sessionId, userId, handler) {
|
|
let self = this;
|
|
let self = this;
|
|
let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
|
|
let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
|
|
let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
|
|
let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
|
|
@ -660,18 +683,18 @@ class Sessions extends RedisModel {
|
|
let now = new Date().getTime();
|
|
let now = new Date().getTime();
|
|
redis.zcountAsync(messagesByTimestampKey, lastFetchTime, now)
|
|
redis.zcountAsync(messagesByTimestampKey, lastFetchTime, now)
|
|
.then(function (count) {
|
|
.then(function (count) {
|
|
if(handler){
|
|
|
|
handler(null,count);
|
|
|
|
}else{
|
|
|
|
ModelUtil.emitOK(self.eventEmitter, {count: count});
|
|
|
|
|
|
if (handler) {
|
|
|
|
handler(null, count);
|
|
|
|
} else {
|
|
|
|
ModelUtil.emitOK(self.eventEmitter, {count: count});
|
|
}
|
|
}
|
|
})
|
|
})
|
|
}
|
|
}
|
|
], function (err, res) {
|
|
], function (err, res) {
|
|
if (err) {
|
|
if (err) {
|
|
if(handler){
|
|
|
|
handler(err,0);
|
|
|
|
}else{
|
|
|
|
|
|
if (handler) {
|
|
|
|
handler(err, 0);
|
|
|
|
} else {
|
|
ModelUtil.emitError(self.eventEmitter, "Get session unread message count failed.")
|
|
ModelUtil.emitError(self.eventEmitter, "Get session unread message count failed.")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@ -767,7 +790,7 @@ class Sessions extends RedisModel {
|
|
//更新用户最后一次获取消息的时间
|
|
//更新用户最后一次获取消息的时间
|
|
Sessions.updateParticipantLastFetchTime(sessionId, message.sender_id, message.timestamp.getTime());
|
|
Sessions.updateParticipantLastFetchTime(sessionId, message.sender_id, message.timestamp.getTime());
|
|
//更新最后一条消息到数据库
|
|
//更新最后一条消息到数据库
|
|
SessionRepo.updateSessionLastStatus(message.sender_id,message.sender_name,message.timestamp,message.content,message.content_type,sessionId);
|
|
|
|
|
|
SessionRepo.updateSessionLastStatus(message.sender_id, message.sender_name, message.timestamp, message.content, message.content_type, sessionId);
|
|
//将消息保存到数据库
|
|
//将消息保存到数据库
|
|
messages.saveMessageToMysql(sessionId, sessionType, messageId, message, function (err, res) {
|
|
messages.saveMessageToMysql(sessionId, sessionType, messageId, message, function (err, res) {
|
|
if (err) {
|
|
if (err) {
|
|
@ -801,19 +824,19 @@ class Sessions extends RedisModel {
|
|
});
|
|
});
|
|
}
|
|
}
|
|
|
|
|
|
sendTopicMessages(topicId,message){
|
|
|
|
|
|
sendTopicMessages(topicId, message) {
|
|
let self = this;
|
|
let self = this;
|
|
TopicRepo.findAllByTopicId(topicId,function(err,res){
|
|
|
|
if(err||res.length==0){
|
|
|
|
ModelUtil.emitOK(self.eventEmitter, {status:-1,"message": "议题获取失败"});
|
|
|
|
|
|
TopicRepo.findAllByTopicId(topicId, function (err, res) {
|
|
|
|
if (err || res.length == 0) {
|
|
|
|
ModelUtil.emitOK(self.eventEmitter, {status: -1, "message": "议题获取失败"});
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
self.saveMessageByTopic(message,res[0].session_id,function(err,messageId){
|
|
|
|
if(err){
|
|
|
|
ModelUtil.emitOK(self.eventEmitter, {status:-1,"message":err});
|
|
|
|
}else{
|
|
|
|
|
|
self.saveMessageByTopic(message, res[0].session_id, function (err, messageId) {
|
|
|
|
if (err) {
|
|
|
|
ModelUtil.emitOK(self.eventEmitter, {status: -1, "message": err});
|
|
|
|
} else {
|
|
message.id = messageId;
|
|
message.id = messageId;
|
|
ModelUtil.emitOK(self.eventEmitter, {status:200,"message":"发送成功",data:message});
|
|
|
|
|
|
ModelUtil.emitOK(self.eventEmitter, {status: 200, "message": "发送成功", data: message});
|
|
}
|
|
}
|
|
});
|
|
});
|
|
});
|
|
});
|
|
@ -855,7 +878,7 @@ class Sessions extends RedisModel {
|
|
//更新最后一条消息
|
|
//更新最后一条消息
|
|
Messages.updateLastContent(session_key, sessionType, sessionName, message);
|
|
Messages.updateLastContent(session_key, sessionType, sessionName, message);
|
|
//更新session实体的最后一条消息
|
|
//更新session实体的最后一条消息
|
|
SessionRepo.updateSessionLastStatus(message.sender_id,message.sender_name,message.timestamp,message.content,message.content_type,sessionId);
|
|
|
|
|
|
SessionRepo.updateSessionLastStatus(message.sender_id, message.sender_name, message.timestamp, message.content, message.content_type, sessionId);
|
|
|
|
|
|
handler(null, messageId);
|
|
handler(null, messageId);
|
|
}).then(function (res) {
|
|
}).then(function (res) {
|
|
@ -942,7 +965,7 @@ class Sessions extends RedisModel {
|
|
* @param userId
|
|
* @param userId
|
|
*/
|
|
*/
|
|
static updateParticipantLastFetchTime(sessionId, userId, score) {
|
|
static updateParticipantLastFetchTime(sessionId, userId, score) {
|
|
score = score+1;
|
|
|
|
|
|
score = score + 1;
|
|
let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
|
|
let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
|
|
redis.zaddAsync(participantsKey, score, userId)
|
|
redis.zaddAsync(participantsKey, score, userId)
|
|
.then(function (res) {
|
|
.then(function (res) {
|