/** * 会话模型。 */ "use strict"; let RedisClient = require('../../repository/redis/redis.client.js'); let RedisModel = require('./../redis.model.js'); let ModelUtil = require('../../util/model.util'); let Messages = require('../messages/messages'); let Users = require('../user/users'); let Participants = require('./participants'); let SessionRepo = require('../../repository/mysql/session.repo'); let TopicRepo = require('../../repository/mysql/topics.repo'); let ParticipantRepo = require('../../repository/mysql/participant.repo'); let WechatClient = require("../client/wechat.client.js"); let AppClient = require("../client/app.client.js"); let configFile = require('../../include/commons').CONFIG_FILE; let config = require('../../resources/config/' + configFile); let redis = RedisClient.redisClient().connection; let logger = require('../../util/log.js'); let mongoose = require('mongoose'); let async = require("async"); let log = require("../../util/log.js"); const REDIS_KEYS = require('../../include/commons').REDIS_KEYS; const SESSION_TYPES = require('../../include/commons').SESSION_TYPES; const STICKY_SESSION_BASE_SCORE = require('../../include/commons').STICKY_SESSION_BASE_SCORE; const PARTICIPANT_ROLES = require('../../include/commons').PARTICIPANT_ROLES; const SESSION_STATUS = require('../../include/commons').SESSION_STATUS; class Sessions extends RedisModel { constructor() { super(); } /** * 创建会话。会话的ID来源: * MUC:患者的ID * P2P:对成员的ID排序后,取hash值 * GROUP:团队的ID * * @param sessionId * @param name 会话名称 * @param type 会话类型 * @param participantArray 会话成员 * @param handler 回调,仅MUC模式使用 */ createSession(sessionId, name, type, participantArray, handler) { let self = this; let messageId = mongoose.Types.ObjectId().toString(); //创建session到mysql self.createSessionToMysql(sessionId, name, type, participantArray, messageId, function (err, res) { if (err) { if (handler) { handler(err, null); return; } ModelUtil.emitError(self.eventEmitter, {message: err, status: -1}, null); } else { //创建session到redis self.createSessionToRedis(sessionId, name, type, participantArray, messageId, function (err, res) { if (err) { if (handler) { handler(err, null); return; } ModelUtil.emitError(self.eventEmitter, {message: err, status: -1}, null); } else { //更新成为进行中的会话 self.updateSessionStatus(sessionId,0); if (handler) { handler(null, res); return; } ModelUtil.emitOK(self.eventEmitter, {status: 200, data: res}); } }); } }); } /** * 创建会话。REDIS * @param sessionId * @param name * @param type * @param participantArray * @param messageId * @param handler * @returns {boolean} */ createSessionToRedis(sessionId, name, type, participantArray, messageId, handler) { let self = this; let messages = new Messages(); let participantIdArray = []; for (let i in participantArray) { participantIdArray.push(participantArray[i].split(":")[0]); } if (type == SESSION_TYPES.P2P || type == SESSION_TYPES.SYSTEM) { if (sessionId) { callBusinessType(sessionId); return; } else if (participantIdArray.length != 2) { handler("P2P session only allow 2 participants.", null); return false; }else{ ParticipantRepo.findSessionIdByParticipantIds(participantIdArray[0], participantIdArray[1], function (err, res) { sessionId = res; callBusinessType(sessionId); return; }); } } else { if (!sessionId) { handler("MUC OR GROUP session sessionId is not allow null .", null); return; } callBusinessType(sessionId); } function callBusinessType(sessionId) { ParticipantRepo.getBusinessType(participantIdArray, function (err, businessType) { callCreate(sessionId, businessType); }); } function callCreate(sessionId, businessType) { let createDate = new Date(); let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId); let message = { sender_id: "system", sender_name: "system", content_type: 11, content: "会话创建成功", timestamp: createDate, id: messageId }; if (type == SESSION_TYPES.MUC) { businessType = 2; } let session = { id: sessionId, name: name, type: type, create_date: createDate.getTime(), business_type: businessType, last_sender_id: message.sender_id, last_sender_name: message.sender_name, last_message_time: message.timestamp.getTime(), last_content: message.content, last_content_type: message.content_type }; redis.hmsetAsync(sessionKey, session).then(function () { Participants.saveParticipantsToRedis(sessionId, participantArray, createDate, function (res) { handler(null, session); //messages.saveMessageToRedisFromCreateSession(sessionId, messageId, message); }); }) } } /** * 创建会话。mysql * @param sessionId * @param name * @param type * @param participantArray * @param messageId * @param handler */ createSessionToMysql(sessionId, name, type, participantArray, messageId, handler) { let self = this; //如果sessionId不存在则执行创建sessionId过程 let participantIdArray = []; for (let i in participantArray) { participantIdArray.push(participantArray[i].split(":")[0]); } //流程1-判断是否存在sessionId不存在则创建对应的sessionId; if (!sessionId) { if (type == SESSION_TYPES.P2P || type == SESSION_TYPES.SYSTEM) { ParticipantRepo.findSessionIdByParticipantIds(participantIdArray[0], participantIdArray[1], function (err, res) { sessionId = res; callBusinessType(); }); } else { return handler("MUC模式和团队模式,不允许sessionId为空!", null); } } else { callBusinessType(); } //流程2-判断session的业务类型; function callBusinessType() { ParticipantRepo.getBusinessType(participantIdArray, function (err, businessType) { if (err) { handler(err, null); return; } callCreateSession(businessType); }); } //流程3-发起session创建 返回session实例 function callCreateSession(businessType) { //查找该sessionId是否存在存在则直接返回实例 SessionRepo.findOne(sessionId, function (err, res) { if (res.length > 0) {//已经存在 //更新成员 Participants.saveParticipantsToMysql(sessionId, participantArray, function (err, update) { handler(null, res[0]); return; }) } else { let createDate = new Date(); let session = { id: sessionId, name: name, type: type, create_date: createDate.getTime(), business_type: businessType }; //将session写入数据库 self.saveSessionToMysql(sessionId, name, type, createDate, businessType, function (err, res) { if (err) { handler(err, null); return; } callCreateParticipants(session); }) } }); } //流程4-发起session成员创建 function callCreateParticipants(session) { Participants.saveParticipantsToMysql(sessionId, participantArray, function (err, res) { if (err) { handler(err, null); return; } else { handler(null, session); return; } }) } } /** * 最近会话列表,7天内。 * * @param userId * @param dateSpan */ getRecentSessions(userId, dateSpan) { let self = this; SessionRepo.findAllByTimestampAndType(userId, dateSpan, function (err, res) { if (err) { ModelUtil.emitError(self.eventEmitter, "Get recent sessions failed", err); return; } let sessions = []; res.forEach(function (session) { sessions.push({ id: session.id, name: session.name, type: session.type, business_type: session.business_type, create_date: session.create_date }) }); ModelUtil.emitOK(self.eventEmitter, sessions); }); } /** * 保存session到MySQL * @param sessionId * @param name * @param type * @param createDate * @param businessType * @param handler */ saveSessionToMysql(sessionId, name, type, createDate, businessType, handler) { SessionRepo.saveSession(sessionId, name, type, createDate, businessType, handler); } /** * 获取某个用户的全部session列表 * @param userId * @param handler */ getUserSessionsFromMysql(userId, handler) { SessionRepo.findAll(userId, handler); } /** * 获取session单个对象 * @param sessionId * @param handler */ getSessions(sessionId, handler) { SessionRepo.findOne(sessionId, handler); } /** * 根据用户ID获取用户的session列表 * @param userId * @param page * @param size * @param businessType */ getUserSessions(userId, page, size, businessType) { let userSessionKey = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, userId); let self = this; if (page > 0) { if (page == 1) { page = 0; } page = page + page * size; } async.waterfall([ // 获取会话ID列表 function (callback) { redis.zrevrangeAsync(userSessionKey, page, size) .then(function (sessionIds) { if (sessionIds.length == 0) { ModelUtil.emitOK(self.eventEmitter, []); return; } callback(null, sessionIds); }) }, // 遍历会话 function (sessionIds) { let sessionList = []; let functionList = []; for (let j = 0; j < sessionIds.length; j++) { let fun = function (index, callback) { if (!callback) { callback = index, index = 0 } let sessionId = sessionIds[index]; let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId); let participantsRoleKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipantsRole, sessionId); let sessionParticipantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId); redis.multi() .hgetall(sessionKey) // 会话实体 .hget(participantsRoleKey, userId) // 用户在此会话中的角色 .zscore(sessionParticipantsKey, userId) // 用户在此会话中最后一次获取未读消息的时间 .zrange(sessionParticipantsKey, 0, -1) .zrange(sessionParticipantsKey, 0,-1,'withscores') // 所有用户在此会话中最后一次获取未读消息的时间 .execAsync() .then(function (res) { let session = res[0]; let role = res[1]; let lastFetchTime = res[2]; let users = res[3]; let participantsTimeArray = res[4]; let participantsTime = []; for(var j = 0 ;j 1 || isoffset == 1) { offset += 1; // 翻页由于闭区间,需跳过本身数据 } participants.existsParticipant(sessionId, userId, function (err, res) { if (!res) { handler(Error("User not found in session " + sessionId), null); } else { //将消息ID转换成分值 redis.multi() .zscore(messagesTimestampKey, startMsgId) .zscore(messagesTimestampKey, endMsgId) .hgetall(sessionKey) .zrange(sessionParticipantsKey, 0, -1) .execAsync() .then(function (res) { let startMsgScore = res[1]; let endMsgScore = res[0]; let session = res[2]; let users = res[3]; if (startMsgScore == null || endMsgScore == null || (startMsgScore == endMsgScore && isoffset == 1)) { handler(null, []); return; } if(endMsgScore>startMsgScore){ redis.zrangebyscoreAsync(messagesTimestampKey, startMsgScore, endMsgScore, "limit", offset, count) .then(function (res) { if (res.length == 0) { handler(null, []); return; } redis.hmgetAsync(messagesKey, res).then(function (messages) { messages.reverse(); handler(null, messages); }).then(function () { Sessions.updateParticipantLastFetchTime(sessionId, userId, new Date().getTime()); if(session.type != SESSION_TYPES.GROUP){ for(var j in users){ if(users[j]==userId)continue; WechatClient.sendAllRead(users[j],sessionId); } } }) }) .catch(function (err) { logger.error("Get message by page failed: ", err); handler(err, false); }) }else{ // 从消息时间表中过滤出要获取的消息ID列表,倒序取出消息 redis.zrevrangebyscoreAsync(messagesTimestampKey, startMsgScore, endMsgScore, "limit", offset, count) .then(function (res) { if (res.length == 0) { handler(null, []); return; } redis.hmgetAsync(messagesKey, res).then(function (messages) { handler(null, messages); }).then(function () { Sessions.updateParticipantLastFetchTime(sessionId, userId, new Date().getTime()); if(session.type != SESSION_TYPES.GROUP){ for(var j in users){ if(users[j]==userId)continue; WechatClient.sendAllRead(users[j],sessionId); } } }) }) .catch(function (err) { logger.error("Get message by page failed: ", err); handler(err, false); }) } }) } }) } /** * 获取所有会话的未读消息数。 */ getAllSessionsUnreadMessageCount(userId) { let self = this; let count = 0; let patientCount = 0; let doctorCount = 0; SessionRepo.findAll(userId, function (err, res) { if (err) { ModelUtil.logError("getAllSessionsUnreadMessageCount is failed", err); return; } if (res.length == 0) { ModelUtil.emitOK(self.eventEmitter, {count: count}); return; } for (let j in res) { if (res[j].type == SESSION_TYPES.SYSTEM) { if (j == res.length - 1) { ModelUtil.emitOK(self.eventEmitter, {count: count}); } continue; } callback(res, j, res[j]); } }); function callback(res, j, session) { self.getSessionUnreadMessageCount(res[j].id, userId, function (err, con) { if (err) { ModelUtil.logError("getAllSessionsUnreadMessageCount is failed", err); } count = count + con; if (session.type == 2) { patientCount = patientCount + con; } else { doctorCount = doctorCount + con; } if (j == res.length - 1) { ModelUtil.emitOK(self.eventEmitter, {count: count, patient: patientCount, doctor: doctorCount}); } }) } } /** * 获取会话的未读消息数。根据成员最后一次获取消息的时候与当前时间。 * * @param sessionId * @param userId */ getSessionUnreadMessageCount(sessionId, userId, handler) { let self = this; let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId); let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId); async.waterfall([ // 此成员最后获取消息的时间 function (callback) { redis.zscoreAsync(participantsKey, userId) .then(function (lastFetchTime) { callback(null, lastFetchTime); }) }, // 计算最后获取消息的时间之后到现在有多少条消息 function (lastFetchTime, callback) { if (!lastFetchTime) lastFetchTime = 0; let now = new Date().getTime(); redis.zcountAsync(messagesByTimestampKey, lastFetchTime, now) .then(function (count) { if (handler) { handler(null, count); } else { ModelUtil.emitOK(self.eventEmitter, {count: count}); } }) } ], function (err, res) { if (err) { if (handler) { handler(err, 0); } else { ModelUtil.emitError(self.eventEmitter, "Get session unread message count failed.") } } }); } /** * 获取会话未读消息数。根据成员最后一次获取消息的时候与当前时间。 */ getSessionUnreadMessages(sessionId, userId) { let self = this; let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId); let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId); async.waterfall([ // 此成员最后获取消息的时间 function (callback) { redis.zscoreAsync(participantsKey, userId) .then(function (lastFetchTime) { callback(null, lastFetchTime); }) }, // 最后获取消息的时间之后到现在的消息ID列表 function (lastFetchTime, callback) { if (!lastFetchTime) lastFetchTime = 0; let now = new Date().getTime(); redis.zrangebyscoreAsync(messagesByTimestampKey, lastFetchTime, now) .then(function (messageIds) { callback(null, messageIds); }) }, // 获取消息 function (messageIds, callback) { if (messageIds.length == 0) { ModelUtil.emitOK(self.eventEmitter, []); return; } let startMsgId = messageIds[0]; let endMsgId = messageIds[messageIds.length - 1]; self.getMessagesByPage(sessionId, userId, startMsgId, endMsgId, 0, messageIds.length, 0, function (err, res) { if (err) { ModelUtil.emitError(self.eventEmitter, err.message); return; } ModelUtil.emitOK(self.eventEmitter, res); }); } ], function (err, res) { if (err) { ModelUtil.emitError(self.eventEmitter, "Get session unread message count failed.") } }); } /** * 保存消息。 * * 也可以根据议题保存消息,但最终还是保存到与会话对象。 * * see also: saveMessageByTopic * * @param message * @param sessionId */ saveMessageBySession(sessionId, message) { let self = this; let messages = new Messages(); let participants = new Participants(); let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId); let messageId = mongoose.Types.ObjectId().toString(); message.id = messageId; // 检查会话中是否存在此成员 participants.existsParticipant(sessionId, message.sender_id, function (err, res) { if (err) { ModelUtil.emitError(self.eventEmitter, "Check session participant failed: ", err); return; } if (res) { redis.hmgetAsync(sessionKey, ["type", "name"]).then(function (res) { let sessionType = res[0]; let sessionName = res[1]; if (sessionType == null) { ModelUtil.emitError(self.eventEmitter, "Session " + sessionId + " is not found."); return; } // 消息保存到Redis,并更新会话最后状态、用户最后消息获取时间 messages.saveMessageToRedis(sessionId, sessionType, messageId, message); Messages.updateLastContent(sessionKey, sessionType, sessionName, message); Sessions.updateParticipantLastFetchTime(sessionId, message.sender_id, message.timestamp.getTime()); // 更新MYSQL中会话的最新状态,并保存消息 SessionRepo.updateSessionLastStatus(message.sender_id, message.sender_name, message.timestamp, message.content, message.content_type, sessionId); messages.saveMessageToMysql(sessionId, sessionType, messageId, message, function (err, res) { if (err) { ModelUtil.emitError(self.eventEmitter, {message: "Failed to save message to mysql: " + err}); } else { message.timestamp = message.timestamp.getTime(); ModelUtil.emitOK(self.eventEmitter, {count: 1, messages: [message]}); } }); }).then(function (res) { // 推送消息 ParticipantRepo.findIds(sessionId, function (err, res) { if (err) { ModelUtil.logError("Push message from session: get participant's id list failed: ", err); } else { message.session_id = sessionId; res.forEach(function (participant) { if (participant.id == message.sender_id){ message.sender_img = participant.avatar; callPush(res,message); } }) } }) }).catch(function (err) { ModelUtil.emitError(self.eventEmitter, {message: "Error occurred while save message to session: " + err}); }) } else { ModelUtil.emitDataNotFound(self.eventEmitter, {message: "当前会话找不到此发送者"}); } }); function callPush(participants,message){ participants.forEach(function (participant) { if (participant.id !== message.sender_id && participant.participant_role == PARTICIPANT_ROLES.HOST) { Sessions.pushNotification(participant.id, participant.name, message); } }); } } sendTopicMessages(topicId, message) { let self = this; TopicRepo.findAllByTopicId(topicId, function (err, res) { if (err || res.length == 0) { ModelUtil.emitOK(self.eventEmitter, {status: -1, "message": "议题获取失败"}); return; } self.saveMessageByTopic(message, res[0].session_id, function (err, messageId) { if (err) { ModelUtil.emitOK(self.eventEmitter, {status: -1, "message": err}); } else { message.id = messageId; ModelUtil.emitOK(self.eventEmitter, {status: 200, "message": "发送成功", data: message}); } }); }); } /** * 保存消息 * * @param message * @param sessionId * @param handler */ saveMessageByTopic(message, sessionId, handler) { let messages = new Messages(); let participants = new Participants(); let session_key = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId); let messageId = mongoose.Types.ObjectId().toString(); let self = this; let sessionType = 0; let sessionName = ""; message.id = messageId; // 发送成员必须处于会话中 participants.existsParticipant(sessionId, message.sender_id, function (err, res) { if (res) { redis.hmgetAsync(session_key, ["type", "name"]).then(function (res) { sessionType = res[0]; sessionName = res[1]; if (!sessionType || !sessionName) { logger.error("Unknown session key " + session_key); if (handler) {handler(new Error("Unknown session key " + session_key));return;} ; } }).then(function (res) { // 消息数据双写,并更新用户最后消息获取时间,会话新状态等 messages.saveMessageToRedis(sessionId, sessionType, messageId, message); messages.saveMessageToMysql(sessionId, sessionType, messageId, message); // 更新会话最新状态及成员最后一次消息获取时间 Sessions.updateParticipantLastFetchTime(sessionId, message.sender_id, message.timestamp.getTime()); Messages.updateLastContent(session_key, sessionType, sessionName, message); SessionRepo.updateSessionLastStatus(message.sender_id, message.sender_name, message.timestamp, message.content, message.content_type, sessionId); if (handler) {handler(null, messageId);return;} }).then(function (res) { // 推送消息 ParticipantRepo.findIds(sessionId, function (err, res) { if (err) { logger.error(err); } else { message.session_id = sessionId; res.forEach(function (participant) { if (participant.id == message.sender_id){ message.sender_img = participant.avatar; callPush(res,message); } }) } }) }).catch(function (err) { if (handler) { handler(err, messageId);return;} }) } else { if (handler){ handler("用户不在此会话当中!", messageId);return;} } }); function callPush(participants,message){ participants.forEach(function (participant) { if (participant.id !== message.sender_id && participant.participant_role == PARTICIPANT_ROLES.HOST) { Sessions.pushNotification(participant.id, participant.name, message); } }); } } /** * 置顶操作 */ stickSession(sessionId, user) { let user_session_key = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, user); let self = this; //取出最大的session redis.zrevrangeAsync(user_session_key, 0, 0).then(function (res) { //获取该session的时间搓 redis.zscoreAsync(user_session_key, res).then(function (scoreres) { let nowtime = new Date().getTime(); //当前时间搓比redis的时间搓更早证明没有置顶过 if (scoreres <= nowtime) { //初始化置顶 redis.zaddAsync(user_session_key, STICKY_SESSION_BASE_SCORE, sessionId).then(function (res) { logger.info("stickSession:" + sessionId + ",res:" + res); ModelUtil.emitOK(self.eventEmitter, {}); }).then(function () { SessionRepo.saveStickySession(sessionId, user, STICKY_SESSION_BASE_SCORE); }) } else { //已有置顶的数据,取出来加1保存回去 scoreres = Number(scoreres) + 1; redis.zaddAsync(user_session_key, scoreres, sessionId).then(function () { logger.info("stickSession:" + sessionId + ",res:" + res); ModelUtil.emitOK(self.eventEmitter, {}); }).then(function () { SessionRepo.saveStickySession(sessionId, user, scoreres); }) } }) }) } /** * 取消会话置顶 */ cancelStickSession(sessionId, user) { let user_session_key = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, user); let participants_key = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId); let self = this; redis.zscoreAsync(participants_key, user).then(function (res) { if (!res) { res = new Date().getTime(); } redis.zaddAsync(user_session_key, res, sessionId).then(function (res) { logger.info("cancelStickSession:" + sessionId); ModelUtil.emitOK(self.eventEmitter, {}); }).then(function () { SessionRepo.unstickSession(sessionId, user); }); }) } /** * 更新会话参与者的最后消息获取时间。 * * @param sessionId * @param userId */ static updateParticipantLastFetchTime(sessionId, userId, score) { score = score + 1; let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId); redis.zaddAsync(participantsKey, score, userId) .then(function (res) { ParticipantRepo.updateLastFetchTime(new Date(score), sessionId, userId, function (err, res) { if (err) { logger.error("Update participant last fetch time failed: ", err); } }); }) .catch(function (err) { logger.error("Update participant last fetch time failed: ", err); }); } /** * 向用户推送通知,微信端用户直接推送消息,APP端通过个推发送通知消息。 * * @param targetUserId * @param message */ static pushNotification(targetUserId, targetUserName, message) { Users.isPatientId(targetUserId, function (err, isPatient) { if (isPatient) { WechatClient.sendMessage(targetUserId, targetUserName, message); } else { WechatClient.sendReadDoctorByDoctorId(targetUserId, message); AppClient.sendNotification(targetUserId, message); } }); } /** * 针对MUC模式更新会话的当前状态 * @param sessionId */ updateSessionStatus(sessionId,status){ let self = this; let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session,sessionId); redis.hsetAsync(sessionKey,"status",status).then(function(res){ SessionRepo.updateSessionStatus(sessionId,status,function(err,sqlResult){ if(err){ logger.error("set session status to mysql is error !"); }else{ logger.info("set session status is success"); } }); }); } } // Expose class module.exports = Sessions;