/** * 会话模型。 */ "use strict"; let RedisClient = require('../../repository/redis/redis.client.js'); let RedisModel = require('./../redis.model.js'); let ModelUtil = require('../../util/model.util'); let Messages = require('../messages/messages'); let Participants = require('./Participants'); let SessionRepo = require('../../repository/mysql/session.repo'); let ParticipantRepo = require('../../repository/mysql/participant.repo'); let configFile = require('../../include/commons').CONFIG_FILE; let config = require('../../resources/config/' + configFile); let redis = RedisClient.redisClient().connection; let logger = require('../../util/log.js'); let mongoose = require('mongoose'); const REDIS_KEYS = require('../../include/commons').REDIS_KEYS; const SESSION_TYPES = require('../../include/commons').SESSION_TYPES; const STICKY_SESSION_BASE_SCORE = require('../../include/commons').STICKY_SESSION_BASE_SCORE; class Sessions extends RedisModel { constructor() { super(); } /** * 创建会话。会话的ID来源: * MUC:患者的ID * P2P:对成员的ID排序后,取hash值 * GROUP:团队的ID * * @param sessionId * @param name 会话名称 * @param type 会话类型 * @param participantArray 会话成员 * @param handler 回调,仅MUC模式使用 */ createSession(sessionId, name, type, participantArray, handler) { let self = this; if (type == SESSION_TYPES.P2P) { var participantIdArray = []; for (let i in participantArray) { participantIdArray.push(participantArray[i].split(":")[0]); } if (participantIdArray.length != 2) { ModelUtil.emitDataNotFound(self.eventEmitter, {message: "P2P session only allow 2 participants."}); return false; } ParticipantRepo.findSessionIdByParticipantIds(participantIdArray[0], participantIdArray[1], function (err, res) { sessionId = res; callCreate(sessionId); }); } else { callCreate(sessionId); } function callCreate(sessionId) { SessionRepo.findOne(sessionId, function (err, res) { if (res.length > 0) { let session = res[0]; ModelUtil.emitOK(self.eventEmitter, { id: session.id, name: session.name, type: session.type, create_date: session.create_date }); return; } let createDate = new Date(); let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId); // 保存会话及成员至MySQL中 self.saveSessionToMysql(sessionId, name, type, createDate, function (err, res) { Participants.saveParticipantsToMysql(sessionId, participantArray, function (err, res) { // 保存会话及成员至Redis中,并更新会话的最后状态 let isMucSession = SESSION_TYPES.MUC == type; let message = { sender_id: "System", sender_name: "System", content_type: 1, content: "", timestamp: createDate }; Messages.updateLastContent(sessionKey, type, name, message); Participants.saveParticipantsToRedis(sessionId, participantArray, createDate, function (res) { if (isMucSession) { handler(true, sessionId); } else { ModelUtil.emitOK(self.eventEmitter, {id: sessionId}); } }); }); }); }); } } /** * 保存session到MySQL * @param sessionId * @param name * @param type * @param createDate * @param handler */ saveSessionToMysql(sessionId, name, type, createDate, handler) { SessionRepo.saveSession(sessionId, name, type, createDate, handler); } /** * 获取某个用户的全部session列表 * @param userId * @param handler */ getUserSessionsFromMysql(userId, handler) { SessionRepo.findAll(userId, handler); } /** * 获取session单个对象 * @param sessionId * @param handler */ getSessions(sessionId, handler) { SessionRepo.findOne(sessionId, handler); } /** * 根据用户ID获取用户的session列表 * @param userId * @param page * @param size */ getUserSessions(userId, page, size) { let userSessionKey = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, userId); let self = this; if (page > 0) { page = page * size; size = size + page; } // 倒序获取 redis.zrevrangeAsync(userSessionKey, page, size).then(function (res) { let sessionList = []; if (res.length == 0) { ModelUtil.emitOK(self.eventEmitter, []); return; } for (let i in res) { callGetSessions(res[i], i == res.length - 1); } function callGetSessions(sessionId, lastOne) { let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId); redis.hgetallAsync(sessionKey).then(function (session) { let sessionParticipantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId); // 对比当前用户最后一次此会话消息的时间与会话中最新的消息时间,以此判断未读消息数量 redis.zscoreAsync(sessionParticipantsKey, userId).then(function (lastFetchTime) { callGetUnreadCount(session, sessionId, lastFetchTime, lastOne); }) }).catch(function (err) { throw err; }) } /** * 统计未读消息数。以当前时间为准。 * * @param session 返回的会话列表 * @param sessionId 当前会话ID * @param lastFetchTime 当前会话当前用户的最后一次时间搓 * @param lastOne */ function callGetUnreadCount(session, sessionId, lastFetchTime, lastOne) { let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId); redis.zrangebyscoreAsync(messagesByTimestampKey, lastFetchTime, (new Date().getTime())) .then(function (messagetimelist) { session.id = sessionId; session.unread_count = messagetimelist.length; callGetMyRole(session, sessionId, lastOne); }) .catch(function (err) { throw err; }); } /** * 获取用户在此会话中的角色。 * * @param session 要返回的JSON * @param sessionId * @param lastOne */ function callGetMyRole(session, sessionId, lastOne) { let participantsRoleKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipantsRole, sessionId); redis.hgetAsync(participantsRoleKey, userId).then(function (role) { session.my_role = role; callback(session, lastOne); }) } /** * 列表封装完毕后由此回调返回数据界面 * * @param session * @param lastOne */ function callback(session, lastOne) { sessionList.push(session); if (lastOne) { ModelUtil.emitOK(self.eventEmitter, sessionList); } } }).catch(function (err) { ModelUtil.emitError(self.eventEmitter, {message: "Get sessions failed: " + err}); }) } /** * 根据会话中的消息 * * @param sessionId 会话ID * @param user 拉取消息的人 * @param page 第几页 * @param pagesize 分页数量 */ getMessages(sessionId, user, stratmsgid,endmsgid ) { let self = this; let message_timestamp_key = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId); let message_key = RedisModel.makeRedisKey(REDIS_KEYS.Messages, sessionId); let participants_key = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId); //超过最大限制后从mysql获取数据 // if (page * pagesize >= config.sessionConfig.maxMessageCount) { // self.getMessageFromMySQL(sessionId, page, pagesize, function (err, res) { // if (!err) { // ModelUtil.emitOK(self.eventEmitter, {"status": 200, "data": res}); // } else { // ModelUtil.emitOK(self.eventEmitter, {"status": -1, "data": err}); // } // }) // } else { // if (page > 0) { // page = page * pagesize; // pagesize = pagesize + page; // } let participants = new Participants(); participants.existsParticipant(sessionId, user, function (res) { if (!res) { ModelUtil.emitOK(self.eventEmitter, {"status": -1, "msg": "用户不在此会话中!"}); } else { //倒序取出最后N条消息 redis.zrevrangebyscoreAsync(message_timestamp_key, endmsgid, stratmsgid).then(function (res) { //取出消息实体 if (res.length == 0) { ModelUtil.emitOK(self.eventEmitter, {"status": 200, "data": []}); return; } redis.hmgetAsync(message_key, res).then(function (messages) { console.log(messages) //将取到的消息返回给前端 ModelUtil.emitOK(self.eventEmitter, {"status": 200, "data": messages}); }).then(function () { //更新患者最后一次获取消息的日期 redis.zaddAsync(participants_key, (new Date().getTime()), user).then(function (res) { console.log(res); }).catch(function (res) { throw res; }) }) }).catch(function (res) { ModelUtil.emitOK(self.eventEmitter, {"status": -1, "msg": res}); }) } }) // } } getAllSessionsUnreadMessageCount(){} /** * 获取会话的未读消息数。 * * @param sessionId * @param userId */ getSessionUnreadMessageCount(sessionId, userId){ } /** * 保存消息。 * * 也可以根据议题保存消息,但最终还是保存到与会话对象。 * * see also: saveMessageByTopic * * @param message * @param sessionId */ saveMessageBySession(sessionId, message) { let self = this; let messages = new Messages(); let participants = new Participants(); let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId); let messageId = mongoose.Types.ObjectId().toString(); // 检查会话中是否存在此成员 participants.existsParticipant(sessionId, message.sender_id, function (err, res) { if (err) { ModelUtil.emitError(self.eventEmitter, "Check session paticipant failed: ", err); return; } if (res) { redis.hmgetAsync(sessionKey, ["type", "name"]).then(function (res) { let sessionType = res[0]; messages.saveMessageToRedis(sessionId, sessionType, messageId, message); messages.saveMessageToMysql(sessionId, sessionType, messageId, message, function (err, res) { if (err) { ModelUtil.emitError(self.eventEmitter, {message: "Failed to save message to mysql: " + err}); } else { ModelUtil.emitOK(self.eventEmitter, {count: 1, messages: [message]}); } }); }).then(function (res) { // TODO: 消息推送 }).catch(function (err) { ModelUtil.emitError(self.eventEmitter, {message: "Error occurred while save message to session: " + err}); }) } else { ModelUtil.emitDataNotFound(self.eventEmitter, {message: "当前会话找不到此发送者"}); } }); } /** * 保存消息 * * @param message * @param sessionId */ saveMessageByTopic(message, sessionId, handler) { let self = this; let messages = new Messages(); let participants = new Participants(); let session_key = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId); let messageId = mongoose.Types.ObjectId().toString(); let sessionType = 0; let name = ""; participants.existsParticipant(sessionId, message.senderId, function (err, res) { //校验发送成员是都在讨论组 if (res) { redis.hmgetAsync(session_key, ["type", "name"]).then(function (res) { sessionType = res[0]; name = res[1]; if (!sessionType || !name) { logger.error("session is error for key " + session_key); throw "session is not found"; } }).then(function (res) { //更新消息相关 return messages.saveMessageForRedis(messageId, sessionId, message); }).then(function (res) { //更新session的最后一条聊天记录 return Messages.updateLastContent(session_key, sessionType, name, message); }).then(function (res) { //操作mysql数据库 messages.saveMessageToMysql(sessionId, sessionType, messageId, message); //返回数据给前端。 handler(null, messageId) //消息推送 }).catch(function (res) { handler(res, messageId) }) } else { handler("用户不在此会话当中!", messageId); } }) } /** * 置顶操作 */ stickSession(sessionId, user) { let user_session_key = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, user); let self = this; //取出最大的session redis.zrevrangeAsync(user_session_key, 0, 0).then(function (res) { //获取该session的时间搓 redis.zscoreAsync(user_session_key, res).then(function (scoreres) { let nowtime = new Date().getTime(); //当前时间搓比redis的时间搓更早证明没有置顶过 if (scoreres <= nowtime) { //初始化置顶 redis.zaddAsync(user_session_key, STICKY_SESSION_BASE_SCORE, sessionId).then(function (res) { logger.info("stickSession:" + sessionId + ",res:" + res); ModelUtil.emitOK(self.eventEmitter, {"status": 200, "msg": "置顶成功!"}); }).then(function () { SessionRepo.saveStickySession(sessionId, user, STICKY_SESSION_BASE_SCORE); }) } else { //已有置顶的数据,取出来加1保存回去 scoreres = Number(scoreres) + 1; redis.zaddAsync(user_session_key, scoreres, sessionId).then(function () { logger.info("stickSession:" + sessionId + ",res:" + res); ModelUtil.emitOK(self.eventEmitter, {"status": 200, "msg": "置顶成功!"}); }).then(function () { SessionRepo.saveStickySession(sessionId, user, scoreres); }) } }) }) } /** * 取消会话置顶 */ cancelStickSession(sessionId, user) { let user_session_key = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, user); let participants_key = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId); let self = this; redis.zscoreAsync(participants_key, user).then(function (res) { if (!res) { res = new Date().getTime(); } redis.zaddAsync(user_session_key, res, sessionId).then(function (res) { logger.info("cancelStickSession:" + sessionId); ModelUtil.emitOK(self.eventEmitter, {"status": 200, "msg": "取消置顶成功!"}); }).then(function () { SessionRepo.unstickSession(sessionId, user); }); }) } } // Expose class module.exports = Sessions;