/** * 会话模型。 */ "use strict"; let RedisClient = require('../../repository/redis/redis.client.js'); let redisClient = RedisClient.redisClient(); let redis = redisClient.connection; let RedisModel = require('./../redis.model.js'); let modelUtil = require('../../util/model.util'); let Messages = require('../messages/messages'); let Participants = require('./Participants') let log = require('../../util/log.js'); const RedisKeys = require('../../include/commons').REDIS_KEYS; const Commons = require('../../include/commons'); let configFile = require('../../include/commons').CONFIG_FILE; let config = require('../../resources/config/' + configFile); let SessionRepo = require('../../repository/mysql/session.repo'); let mongoose = require('mongoose'); class Sessions extends RedisModel { constructor() { super(); } /** * 获取某个用户的全部session列表 * @param userId * @param handler */ getUserSessionsFromMysql(userId,handler){ SessionRepo.getUserSessionsFromMysql(userId,handler); } /** * 获取session单个对象 * @param sessionId * @param handler */ getSessions(sessionId,handler){ SessionRepo.getSessions(sessionId,handler); } /** * 根据用户ID获取用户的session列表 * @param userId */ getUserSessions(userId,page,pagesize){ let user_session_key = super.makeRedisKey(RedisKeys.UsersSessions,userId); let self = this; let _super = super.makeRedisKey; if(page >0){ page = page*pagesize; pagesize = pagesize+page; } //倒序 redis.zrevrangeAsync(user_session_key,page,pagesize).then(function(res){ let sessionlist =[]; if(res.length==0){ modelUtil.emitData(self.eventEmitter,{"status":200,"data":res}); }else{ for(var j in res){ calllist(res[j],j,res.length); } } function calllist(session,j,_len){ let session_key =_super(RedisKeys.Session,session); redis.hgetallAsync(session_key).then(function(res){ let participants_key = _super(RedisKeys.Participants,session); //当前用户最后一次登录改讨论组时间 redis.zscoreAsync(participants_key,userId).then(function(restimestamp){ //时间差获取消息数量 callamount(res,j,_len,session,restimestamp); }) }).catch(function(err){ throw err; }) } /** * 消息统计 * @param res 返回的会话列表 * @param j 当前会话列表的位置 * @param _len 列表长度 用做返回前端操作 * @param session 当前会话 * @param restimestamp 当前会话当前用户的最后一次时间搓 */ function callamount(res,j,_len,session,restimestamp){ let message_time_key = _super(RedisKeys.MessagesTimestamp,session); redis.zrangebyscoreAsync(message_time_key,restimestamp,(new Date().getTime())).then(function(messagetimelist){ res.sessionId = session; res.message = messagetimelist.length; callback(res,j,_len); }).catch(function(err){ throw err; }) } /** * 列表封装完毕后由此回调返回数据界面 * @param res * @param j * @param _len */ function callback(res,j,_len){ sessionlist.push(res); if(j==(_len-1)){ modelUtil.emitData(self.eventEmitter,{"status":200,"data":sessionlist}); } } }).catch(function(res){ modelUtil.emitData(self.eventEmitter,"get list error "+res+",user:"+userId); }) } /** * 根据sessionId获取对应的消息 * @param sessionId 会话ID * @param user 拉取消息的人 * @param page 第几页 * @param pagesize 分页数量 */ getSessionMessages(sessionId,user,page,pagesize){ let self = this; let message_timestamp_key = super.makeRedisKey(RedisKeys.MessagesTimestamp,sessionId); let message_key = super.makeRedisKey(RedisKeys.Messages,sessionId); let participants_key = super.makeRedisKey(RedisKeys.Participants,sessionId); //超过最大限制后从mysql获取数据 if(page*pagesize>=config.sessionConfig.maxMessageCount){ let message = new Messages(); message.getMessageByPage(sessionId,page,pagesize,function(err,res){ if(!err){ modelUtil.emitData(self.eventEmitter,{"status":200,"data":res}); }else{ modelUtil.emitData(self.eventEmitter,{"status":-1,"data":err}); } }) }else{ if(page>0){ page = page*pagesize; pagesize = pagesize+page; } let participants = new Participants(); participants.existsUser(sessionId,user,function(res){ if(!res){ modelUtil.emitData(self.eventEmitter,{"status":-1,"msg":"用户不在此会话中!"}); }else{ //倒序取出最后N条消息 redis.zrevrangeAsync(message_timestamp_key,page,pagesize).then(function(res){ //取出消息实体 if(res.length==0){ modelUtil.emitData(self.eventEmitter,{"status":200,"data":[]}); return; } redis.hmgetAsync(message_key,res).then(function(messages) { console.log(messages) //将取到的消息返回给前端 modelUtil.emitData(self.eventEmitter,{"status":200,"data":messages}); }).then(function(){ //更新患者最后一次获取消息的日期 redis.zaddAsync(participants_key, (new Date().getTime()),user).then(function(res){ console.log(res); }).catch(function(res){ throw res; }) }) }).catch(function(res){ modelUtil.emitData(self.eventEmitter,{"status":-1,"msg":res}); }) } }) } } /** * 更新最后一条消息 * @param session_key rediskey * @param create_date 创建时间 * @param last_content 最后一条消息内容 * @param last_content_type 消息类型 * @param type 会话类型 * @param sender 发送者ID * @param sendername 发送者名字 * @param name 议题名称 * @returns {*} */ updateLastContent(session_key,session_type,name,message){ return redis.hmsetAsync(session_key, "create_date", message.timestamp, "last_content", message.content, "last_content_type", message.contentType, "type", session_type, "senderId",message.senderId, "senderName",message.senderName, "name",name ); } /** * 保存消息,用于消息发送 * @param message * @param sessionId */ saveMessageBySession(message,sessionId) { let self = this; let messages = new Messages(); let participants = new Participants(); let session_key = super.makeRedisKey(RedisKeys.Session,sessionId); let message_id = mongoose.Types.ObjectId().toString(); let session_type = 0; let name = ""; participants.existsUser(sessionId,messages.senderId,function(res){ //校验发送成员是都在讨论组 if(res){ redis.hmgetAsync(session_key, ["type","name"]).then(function(res){ session_type = res[0]; name = res[1]; if(!session_type||!name){ log.error("session is error for key "+session_key); throw "session is not found"; } }).then(function(res){ //更新消息相关 return messages.saveMessageForRedis(message_id,sessionId,message); }).then(function (res) { //更新session的最后一条聊天记录 return self.updateLastContent(session_key,session_type,name,message); }).then(function (res) { //操作mysql数据库 messages.saveMessageForMysql(message,session_type,message_id,sessionId); //返回数据给前端。 modelUtil.emitData(self.eventEmitter, {"status":200,"msg":"发送成功!"}); //消息推送 }).catch(function (res) { modelUtil.emitData(self.eventEmitter,{"status":-1,"msg":res}); }) }else{ modelUtil.emitData(self.eventEmitter,{"status":-1,"msg":"用户不在此会话当中!"}); } }) } /** * 非MUC模式创建会话 * @param sessionId 会话ID * @param name 会话名称 * @param type 会话类型 * @param users 会话成员 */ createSessions(sessionId,name,type,users){ if(type==2){ } let self = this; let createDate = new Date(); let session_key = super.makeRedisKey(RedisKeys.Session,sessionId); let participants = new Participants(); //将session加入reids participants.createParticipantsToRedis(sessionId,users.split(","),createDate,function(res){ if(!res){ modelUtil.emitData(self.eventEmitter, {"status":-1,"msg":res}); }else{ let messages={}; messages.senderId="system"; messages.senderName="系统消息"; messages.timestamp=createDate; messages.content="" messages.contentType="1"; self.updateLastContent(session_key,type,name,messages); modelUtil.emitData(self.eventEmitter, {"status":200,"msg":"session create success!"}); self.saveSessionToMysql(sessionId,name,type,createDate); //创建session成员到数据库 participants.createParticipantsToMysql(sessionId,users.split(",")); } }) } /** * 保存session到sql数据库 * @param sessionId * @param name * @param type * @param createDate */ saveSessionToMysql(sessionId,name,type,createDate){ SessionRepo.saveSessionToMysql(sessionId,name,type,createDate); } /** *置顶操作 */ stickSession(sessionId,user){ let user_session_key = super.makeRedisKey(RedisKeys.UsersSessions,user); let self = this; //取出最大的session redis.zrevrangeAsync(user_session_key,0,0).then(function(res){ //获取该session的时间搓 redis.zscoreAsync(user_session_key,res).then(function(scoreres){ let nowtime = new Date().getTime(); //当前时间搓比redis的时间搓更早证明没有置顶过 if(scoreres<=nowtime){ //初始化置顶 redis.zaddAsync(user_session_key, Commons.STICK_NUM,sessionId).then(function(res){ log.info("stickSession:"+sessionId+",res:"+res); modelUtil.emitData(self.eventEmitter,{"status":200,"msg":"置顶成功!"}); }) }else{ //已有置顶的数据,取出来加1保存回去 scoreres = scoreres+1; redis.zaddAsync(user_session_key, scoreres,sessionId).then(function(){ log.info("stickSession:"+sessionId+",res:"+res); modelUtil.emitData(self.eventEmitter,{"status":200,"msg":"置顶成功!"}); }) } }) }) } /** *取消置顶操作 */ cancelStickSession(sessionId,user){ let user_session_key = super.makeRedisKey(RedisKeys.UsersSessions,user); let participants_key = super.makeRedisKey(RedisKeys.Participants,sessionId); let self = this; redis.zscoreAsync(participants_key,user).then(function(res){ if(!res){ res = new Date().getTime(); } redis.zaddAsync(user_session_key, res,sessionId).then(function(res){ log.info("cancelStickSession:"+sessionId); modelUtil.emitData(self.eventEmitter,{"status":200,"msg":"取消置顶成功!"}); }); }) } } // Expose class module.exports = Sessions;