/** * 会话模型。 */ "use strict"; let RedisClient = require('../../repository/redis/redis.client.js'); let redisClient = RedisClient.redisClient(); let redis = redisClient.connection; let RedisModel = require('./../redis.model.js'); let modelUtil = require('../../util/model.util'); let Messages = require('../messages/messages'); let Participants = require('./Participants'); let log = require('../../util/log.js'); const RedisKeys = require('../../include/commons').REDIS_KEYS; const Commons = require('../../include/commons'); let configFile = require('../../include/commons').CONFIG_FILE; let config = require('../../resources/config/' + configFile); let SessionRepo = require('../../repository/mysql/session.repo'); let ParticipantRepo = require('../../repository/mysql/participant.repo'); let mongoose = require('mongoose'); class Sessions extends RedisModel { constructor() { super(); } /** * 创建会话 * * type = 1 sessionid = md5(patientId); MUC * type = 2 sessionId = hash(user1,user2); P2P * type = 3 sessionId = groupid; 团队群聊 * @param sessionId 会话ID * @param name 会话名称 * @param type 会话类型 * @param users 会话成员 */ createSession(sessionId, name, type, users) { let self = this; let _super = super.makeRedisKey; users = eval("["+users+"]")[0]; if (type == 2) {//P2P消息用hash校验 var userArray=[]; for(var key in users){ userArray.push(key); } if(userArray.length>2){ modelUtil.emitData(self.eventEmitter, {"status": -1, "msg": "会话人数超过2个无法创建P2P会话!"}); return false; } ParticipantRepo.findSessionIdByParticipantIds(userArray[0],userArray[0],function(err,res){ sessionId = res; callcreate(sessionId); }) }else{ callcreate(); } function callcreate(){ let createDate = new Date(); let session_key = _super(RedisKeys.Session, sessionId); let participants = new Participants(); // 将session加入redis participants.saveParticipantsToRedis(sessionId, users, createDate, function (res) { if (!res) { modelUtil.emitData(self.eventEmitter, {"status": -1, "msg": res}); } else { let messages = {}; messages.senderId = "system"; messages.senderName = "系统消息"; messages.timestamp = createDate; messages.content = ""; messages.contentType = "1"; self.updateLastContent(session_key, type, name, messages); modelUtil.emitData(self.eventEmitter, {"status": 200, "msg": "session create success!"}); self.saveSessionToMysql(sessionId, name, type, createDate); participants.saveParticipantsToMysql(sessionId, users); //创建session成员到数据库 } }) } } /** * 保存session到MySQL * @param sessionId * @param name * @param type * @param createDate */ saveSessionToMysql(sessionId, name, type, createDate) { SessionRepo.saveSession(sessionId, name, type, createDate); } /** * 获取某个用户的全部session列表 * @param userId * @param handler */ getUserSessionsFromMysql(userId, handler) { SessionRepo.findAll(userId, handler); } /** * 获取session单个对象 * @param sessionId * @param handler */ getSessions(sessionId, handler) { SessionRepo.findOne(sessionId, handler); } /** * 根据用户ID获取用户的session列表 * @param userId * @param page * @param pagesize */ getUserSessions(userId, page, pagesize) { let user_session_key = super.makeRedisKey(RedisKeys.UserSessions, userId); let self = this; let _super = super.makeRedisKey; if (page > 0) { page = page * pagesize; pagesize = pagesize + page; } //倒序 redis.zrevrangeAsync(user_session_key, page, pagesize).then(function (res) { let sessionlist = []; if (res.length == 0) { modelUtil.emitData(self.eventEmitter, {"status": 200, "data": res}); } else { for (var j in res) { calllist(res[j], j, res.length); } } function calllist(session, j, _len) { let session_key = _super(RedisKeys.Session, session); redis.hgetallAsync(session_key).then(function (res) { let participants_key = _super(RedisKeys.Participants, session); //当前用户最后一次登录改讨论组时间 redis.zscoreAsync(participants_key, userId).then(function (restimestamp) { //时间差获取消息数量 callamount(res, j, _len, session, restimestamp); }) }).catch(function (err) { throw err; }) } /** * 消息统计 * @param res 返回的会话列表 * @param j 当前会话列表的位置 * @param _len 列表长度 用做返回前端操作 * @param session 当前会话 * @param restimestamp 当前会话当前用户的最后一次时间搓 */ function callamount(res, j, _len, session, restimestamp) { let message_time_key = _super(RedisKeys.MessagesByTimestamp, session); redis.zrangebyscoreAsync(message_time_key, restimestamp, (new Date().getTime())).then(function (messagetimelist) { res.sessionId = session; res.message = messagetimelist.length; callrole(res, j, _len,session); }).catch(function (err) { throw err; }) } /** * 用户角色 * @param res要返回的JSON * @param j 第N调数据 * @param _len 总数据长度 */ function callrole(res, j, _len,session){ let participants_role_key = _super(RedisKeys.ParticipantsRole, session); redis.hgetAsync(participants_role_key, userId).then(function(role){ res.role=role; callback(res, j, _len); }) } /** * 列表封装完毕后由此回调返回数据界面 * @param res * @param j * @param _len */ function callback(res, j, _len) { sessionlist.push(res); if (j == (_len - 1)) { modelUtil.emitData(self.eventEmitter, {"status": 200, "data": sessionlist}); } } }).catch(function (res) { modelUtil.emitData(self.eventEmitter, "get list error " + res + ",user:" + userId); }) } /** * 根据会话中的消息 * * @param sessionId 会话ID * @param user 拉取消息的人 * @param page 第几页 * @param pagesize 分页数量 */ getMessages(sessionId, user, page, pagesize) { let self = this; let message_timestamp_key = super.makeRedisKey(RedisKeys.MessagesByTimestamp, sessionId); let message_key = super.makeRedisKey(RedisKeys.Messages, sessionId); let participants_key = super.makeRedisKey(RedisKeys.Participants, sessionId); //超过最大限制后从mysql获取数据 if (page * pagesize >= config.sessionConfig.maxMessageCount) { let message = new Messages(); message.getMessageByPage(sessionId, page, pagesize, function (err, res) { if (!err) { modelUtil.emitData(self.eventEmitter, {"status": 200, "data": res}); } else { modelUtil.emitData(self.eventEmitter, {"status": -1, "data": err}); } }) } else { if (page > 0) { page = page * pagesize; pagesize = pagesize + page; } let participants = new Participants(); participants.existsParticipant(sessionId, user, function (res) { if (!res) { modelUtil.emitData(self.eventEmitter, {"status": -1, "msg": "用户不在此会话中!"}); } else { //倒序取出最后N条消息 redis.zrevrangeAsync(message_timestamp_key, page, pagesize).then(function (res) { //取出消息实体 if (res.length == 0) { modelUtil.emitData(self.eventEmitter, {"status": 200, "data": []}); return; } redis.hmgetAsync(message_key, res).then(function (messages) { console.log(messages) //将取到的消息返回给前端 modelUtil.emitData(self.eventEmitter, {"status": 200, "data": messages}); }).then(function () { //更新患者最后一次获取消息的日期 redis.zaddAsync(participants_key, (new Date().getTime()), user).then(function (res) { console.log(res); }).catch(function (res) { throw res; }) }) }).catch(function (res) { modelUtil.emitData(self.eventEmitter, {"status": -1, "msg": res}); }) } }) } } /** * 更新会话最后一条消息 * * @param session_key rediskey * @param session_type * @param name 议题名称 * @param message * @returns {*} */ updateLastContent(session_key, session_type, name, message) { return redis.hmsetAsync(session_key, "create_date", message.timestamp, "last_content", message.content, "last_content_type", message.contentType, "type", session_type, "senderId", message.senderId, "senderName", message.senderName, "name", name ); } /** * 保存消息 * * @param message * @param sessionId */ saveMessageBySession(message, sessionId) { let self = this; let messages = new Messages(); let participants = new Participants(); let session_key = super.makeRedisKey(RedisKeys.Session, sessionId); let message_id = mongoose.Types.ObjectId().toString(); let session_type = 0; let name = ""; participants.existsParticipant(sessionId, message.senderId, function (res) { //校验发送成员是都在讨论组 if (res) { redis.hmgetAsync(session_key, ["type", "name"]).then(function (res) { session_type = res[0]; name = res[1]; if (!session_type || !name) { log.error("session is error for key " + session_key); throw "session is not found"; } }).then(function (res) { //更新消息相关 return messages.saveMessageForRedis(message_id, sessionId, message); }).then(function (res) { //更新session的最后一条聊天记录 return self.updateLastContent(session_key, session_type, name, message); }).then(function (res) { //操作mysql数据库 messages.saveMessageToMysql(message, session_type, message_id, sessionId); //返回数据给前端。 modelUtil.emitData(self.eventEmitter, {"status": 200, "msg": "发送成功!"}); //消息推送 }).catch(function (res) { modelUtil.emitData(self.eventEmitter, {"status": -1, "msg": res}); }) } else { modelUtil.emitData(self.eventEmitter, {"status": -1, "msg": "用户不在此会话当中!"}); } }) } /** * 置顶操作 */ stickSession(sessionId, user) { let user_session_key = super.makeRedisKey(RedisKeys.UserSessions, user); let self = this; //取出最大的session redis.zrevrangeAsync(user_session_key, 0, 0).then(function (res) { //获取该session的时间搓 redis.zscoreAsync(user_session_key, res).then(function (scoreres) { let nowtime = new Date().getTime(); //当前时间搓比redis的时间搓更早证明没有置顶过 if (scoreres <= nowtime) { //初始化置顶 redis.zaddAsync(user_session_key, Commons.STICKY_SESSION_BASE_SCORE, sessionId).then(function (res) { log.info("stickSession:" + sessionId + ",res:" + res); modelUtil.emitData(self.eventEmitter, {"status": 200, "msg": "置顶成功!"}); }).then(function () { SessionRepo.saveStickySession(sessionId, user, Commons.STICKY_SESSION_BASE_SCORE); }) } else { //已有置顶的数据,取出来加1保存回去 scoreres = Number(scoreres) + 1; redis.zaddAsync(user_session_key, scoreres, sessionId).then(function () { log.info("stickSession:" + sessionId + ",res:" + res); modelUtil.emitData(self.eventEmitter, {"status": 200, "msg": "置顶成功!"}); }).then(function () { SessionRepo.saveStickySession(sessionId, user, scoreres); }) } }) }) } /** * 取消会话置顶 */ cancelStickSession(sessionId, user) { let user_session_key = super.makeRedisKey(RedisKeys.UserSessions, user); let participants_key = super.makeRedisKey(RedisKeys.Participants, sessionId); let self = this; redis.zscoreAsync(participants_key, user).then(function (res) { if (!res) { res = new Date().getTime(); } redis.zaddAsync(user_session_key, res, sessionId).then(function (res) { log.info("cancelStickSession:" + sessionId); modelUtil.emitData(self.eventEmitter, {"status": 200, "msg": "取消置顶成功!"}); }).then(function () { SessionRepo.unstickSession(sessionId, user); }); }) } } // Expose class module.exports = Sessions;