/** * 会话模型。 */ "use strict"; let RedisClient = require('../../repository/redis/redis.client.js'); let RedisModel = require('./../redis.model.js'); let ModelUtil = require('../../util/model.util'); let Messages = require('../messages/messages'); let Users = require('../user/users'); let Participants = require('./Participants'); let SessionRepo = require('../../repository/mysql/session.repo'); let ParticipantRepo = require('../../repository/mysql/participant.repo'); let WechatClient = require("../client/wechat.client.js"); let AppClient = require("../client/app.client.js"); let configFile = require('../../include/commons').CONFIG_FILE; let config = require('../../resources/config/' + configFile); let redis = RedisClient.redisClient().connection; let logger = require('../../util/log.js'); let mongoose = require('mongoose'); let async = require("async"); const REDIS_KEYS = require('../../include/commons').REDIS_KEYS; const SESSION_TYPES = require('../../include/commons').SESSION_TYPES; const STICKY_SESSION_BASE_SCORE = require('../../include/commons').STICKY_SESSION_BASE_SCORE; const SESSION_BUSINESS_TYPE = require('../../include/commons').SESSION_BUSINESS_TYPE; class Sessions extends RedisModel { constructor() { super(); } /** * 创建会话。会话的ID来源: * MUC:患者的ID * P2P:对成员的ID排序后,取hash值 * GROUP:团队的ID * * @param sessionId * @param name 会话名称 * @param type 会话类型 * @param participantArray 会话成员 * @param handler 回调,仅MUC模式使用 */ createSession(sessionId, name, type, participantArray, handler) { let self = this; if (type == SESSION_TYPES.P2P) { var participantIdArray = []; for (let i in participantArray) { participantIdArray.push(participantArray[i].split(":")[0]); } if (participantIdArray.length != 2) { ModelUtil.emitDataNotFound(self.eventEmitter, {message: "P2P session only allow 2 participants."}); return false; } ParticipantRepo.findSessionIdByParticipantIds(participantIdArray[0], participantIdArray[1], function (err, res) { sessionId = res; callBusinessType(sessionId); }); } else { callBusinessType(sessionId); } function callBusinessType(sessionId) { var businessType = SESSION_BUSINESS_TYPE.DOCTOR; for (var j = 0; j < participantArray.length; j++) callIsPatient(j, businessType, participantArray.length); } function callIsPatient(j, businessType, length) { Users.isPatientId(participantArray[j], function (isPatient) { if (isPatient) { businessType = SESSION_BUSINESS_TYPE.PATIENT } if (length - 1 == j || businessType == SESSION_BUSINESS_TYPE.PATIENT) { callCreate(sessionId, businessType); } }) } function callCreate(sessionId, businessType) { SessionRepo.findOne(sessionId, function (err, res) { if (res.length > 0) { let session = res[0]; ModelUtil.emitOK(self.eventEmitter, { id: session.id, name: session.name, type: session.type, business_type: session.business_type || businessType, create_date: session.create_date }); return; } let createDate = new Date(); let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId); // 保存会话及成员至MySQL中 self.saveSessionToMysql(sessionId, name, type, createDate, businessType, function (err, res) { Participants.saveParticipantsToMysql(sessionId, participantArray, function (err, res) { if (err) { ModelUtil.emitError(self.eventEmitter, err.message); return; } // 保存会话及成员至Redis中,并更新会话的最后状态 let isMucSession = SESSION_TYPES.MUC == type; let message = { sender_id: "System", sender_name: "System", content_type: 1, content: "", timestamp: createDate }; Messages.updateLastContent(sessionKey, type, name, message); Participants.saveParticipantsToRedis(sessionId, participantArray, createDate, function (res) { if (isMucSession) { handler(true, sessionId); } else { ModelUtil.emitOK(self.eventEmitter, {id: sessionId}); } }); }); }); }); } } /** * 保存session到MySQL * @param sessionId * @param name * @param type * @param createDate * @param handler */ saveSessionToMysql(sessionId, name, type, createDate, businessType, handler) { SessionRepo.saveSession(sessionId, name, type, createDate, businessType, handler); } /** * 获取某个用户的全部session列表 * @param userId * @param handler */ getUserSessionsFromMysql(userId, handler) { SessionRepo.findAll(userId, handler); } /** * 获取session单个对象 * @param sessionId * @param handler */ getSessions(sessionId, handler) { SessionRepo.findOne(sessionId, handler); } /** * 根据用户ID获取用户的session列表 * @param userId * @param page * @param size */ getUserSessions(userId, page, size) { let userSessionKey = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, userId); let self = this; if (page > 0) { page = page * size; size = size + page; } async.waterfall([ // 获取会话ID列表 function (callback) { redis.zrevrangeAsync(userSessionKey, page, size) .then(function (sessionIds) { if (sessionIds.length == 0) { ModelUtil.emitOK(self.eventEmitter, []); return; } callback(null, sessionIds); }) }, // 遍历会话 function (sessionIds, callback) { let sessionList = []; sessionIds.forEach(function (sessionId) { let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId); let participantsRoleKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipantsRole, sessionId); let sessionParticipantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId); redis.multi() .hgetall(sessionKey) // 会话实体 .hget(participantsRoleKey, userId) // 用户在此会话中的角色 .zscore(sessionParticipantsKey, userId) // 用户在此会话中最后一次获取未读消息的时间 .execAsync() .then(function (res) { let session = res[0]; let role = res[1]; let lastFetchTime = res[2]; // 计算未读消息数 let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId); redis.zcountAsync(messagesByTimestampKey, lastFetchTime, new Date().getTime()) .then(function (count) { sessionList.push({ id: sessionId, name: session.name, create_date: session.create_date, last_content_type: session.last_content_type, last_content: session.last_content, sender_id: session.sender_id, sender_name: session.sender_name, unread_count: count, my_role: role }); if (sessionId === sessionIds[sessionIds.length - 1]) { ModelUtil.emitOK(self.eventEmitter, sessionList); } }); }) .catch(function (err) { ModelUtil.emitError(self.eventEmitter, "Get sessions failed: " + err); }); }); } ]); } /** * 获取会话消息。全部,不管已读/未读状态。 * * @param sessionId 会话ID * @param userId 拉取消息的人 * @param page 第几页 * @param pagesize 分页数量 * @param start_msg_id 消息会话最新的一条消息的ID * @param end_msg_id 消息会话刚开始的消息ID */ getMessages(sessionId, user, start_msg_id, end_msg_id, page, pagesize) { let self = this; let message_timestamp_key = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId); if (!start_msg_id && !end_msg_id) { redis.zrevrangeAsync(message_timestamp_key, 0, 0).then(function (res) { if (res.length == 0) { ModelUtil.emitOK(self.eventEmitter, res); return; } start_msg_id = res[0]; redis.zrangeAsync(message_timestamp_key, 0, 0).then(function (res) { if (res.length == 0) { ModelUtil.emitOK(self.eventEmitter, res); return; } end_msg_id = res[0]; self.getMessagesByPage(sessionId, user, end_msg_id, start_msg_id, page, pagesize, function (err, res) { if (err) { logger.error("getMessagesByPage error" + err); ModelUtil.emitError(self.eventEmitter, err, err); } else { ModelUtil.emitOK(self.eventEmitter, res); } }) }) }) } else if (!start_msg_id) { redis.zrevrangeAsync(message_timestamp_key, 0, 0).then(function (res) { if (res.length == 0) { ModelUtil.emitOK(self.eventEmitter, res); return; } start_msg_id = res[0]; self.getMessagesByPage(sessionId, user, end_msg_id, start_msg_id, page, pagesize, function (err, res) { if (err) { logger.error("getMessagesByPage error" + err); ModelUtil.emitError(self.eventEmitter, err, err); } else { ModelUtil.emitOK(self.eventEmitter, res); } }) }) } else if (!end_msg_id) { redis.zrangeAsync(message_timestamp_key, 0, 0).then(function (res) { if (res.length == 0) { ModelUtil.emitOK(self.eventEmitter, res); return; } end_msg_id = res[0]; self.getMessagesByPage(sessionId, user, start_msg_id, end_msg_id, page, pagesize, function (err, res) { if (err) { logger.error("getMessagesByPage error" + err); ModelUtil.emitError(self.eventEmitter, err, err); } else { ModelUtil.emitOK(self.eventEmitter, res); } }) }) } else { self.getMessagesByPage(sessionId, user, end_msg_id, start_msg_id, page, pagesize, function (err, res) { if (err) { logger.error("getMessagesByPage error" + err); ModelUtil.emitError(self.eventEmitter, err, err); } else { ModelUtil.emitOK(self.eventEmitter, res); } }) } } /** * 分页获取会话消息。 * * @param sessionId 必选。会话ID * @param userId 必选。用户ID * @param startMsgId 必选。会话的的起始消息ID,作为检索的起始依据 * @param endMsgId 必选。会话中的结束消息ID * @param page 必选。页码 * @param size 必选。页面大小 * @param handler 必选。回调 */ getMessagesByPage(sessionId, userId, startMsgId, endMsgId, page, size, handler) { let messagesTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId); let messagesKey = RedisModel.makeRedisKey(REDIS_KEYS.Messages, sessionId); let participants = new Participants(); let offset = (page - 1 < 0 ? 0 : page - 1) * size; let count = size; if (page > 1 || startMsgId) {//翻页由于闭区间,需跳过本身数据 offset += 1; } participants.existsParticipant(sessionId, userId, function (err, res) { if (!res) { handler(Error("User not found in session " + sessionId), null); } else { //将消息ID转换成分值 redis.multi() .zscore(messagesTimestampKey, startMsgId) .zscore(messagesTimestampKey, endMsgId) .execAsync() .then(function (res) { let startMsgScore = res[1]; let endMsgScore = res[0]; if (startMsgScore == null || endMsgScore == null) { handler(null, []); return; } // 从消息时间表中过滤出要获取的消息ID列表,倒序取出消息 redis.zrevrangebyscoreAsync(messagesTimestampKey, startMsgScore, endMsgScore, "limit", offset, count) .then(function (res) { if (res.length == 0) { handler(null, []); return; } redis.hmgetAsync(messagesKey, res).then(function (messages) { handler(null, messages); }).then(function () { Sessions.updateParticipantLastFetchTime(sessionId, userId, new Date().getTime()); }) }).catch(function (res) { handler(res, false); }) }) } }) } /** * 获取所有会话的未读消息数。 */ getAllSessionsUnreadMessageCount(userId) { let self = this; ModelUtil.emitError(self.eventEmitter, {message: "not implemented."}, null); } /** * 获取会话的未读消息数。根据成员最后一次获取消息的时候与当前时间。 * * @param sessionId * @param userId */ getSessionUnreadMessageCount(sessionId, userId) { let self = this; let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId); let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId); async.waterfall([ // 此成员最后获取消息的时间 function (callback) { redis.zscoreAsync(participantsKey, userId) .then(function (lastFetchTime) { callback(null, lastFetchTime); }) }, // 计算最后获取消息的时间之后到现在有多少条消息 function (lastFetchTime, callback) { if (!lastFetchTime) lastFetchTime = 0; let now = new Date().getTime(); redis.zcountAsync(messagesByTimestampKey, lastFetchTime, now) .then(function (count) { ModelUtil.emitOK(self.eventEmitter, {count: count}); }) } ], function (err, res) { if (err) { ModelUtil.emitError(self.eventEmitter, "Get session unread message count failed.") } }); } /** * 获取会话未读消息数。根据成员最后一次获取消息的时候与当前时间。 */ getSessionUnreadMessages(sessionId, userId) { let self = this; let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId); let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId); async.waterfall([ // 此成员最后获取消息的时间 function (callback) { redis.zscoreAsync(participantsKey, userId) .then(function (lastFetchTime) { callback(null, lastFetchTime); }) }, // 最后获取消息的时间之后到现在的消息ID列表 function (lastFetchTime, callback) { if (!lastFetchTime) lastFetchTime = 0; let now = new Date().getTime(); redis.zrangebyscoreAsync(messagesByTimestampKey, lastFetchTime, now) .then(function (messageIds) { callback(null, messageIds); }) }, // 获取消息 function (messageIds, callback) { if (messageIds.length == 0) { ModelUtil.emitOK(self.eventEmitter, []); return; } let startMsgId = messageIds[0]; let endMsgId = messageIds[messageIds.length - 1]; self.getMessagesByPage(sessionId, userId, startMsgId, endMsgId, 0, messageIds.length, function (err, res) { if (err) { ModelUtil.emitError(self.eventEmitter, err.message); return; } ModelUtil.emitOK(self.eventEmitter, res); }); } ], function (err, res) { if (err) { ModelUtil.emitError(self.eventEmitter, "Get session unread message count failed.") } }); } /** * 保存消息。 * * 也可以根据议题保存消息,但最终还是保存到与会话对象。 * * see also: saveMessageByTopic * * @param message * @param sessionId */ saveMessageBySession(sessionId, message) { let self = this; let messages = new Messages(); let participants = new Participants(); let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId); let messageId = mongoose.Types.ObjectId().toString(); message.id = messageId; // 检查会话中是否存在此成员 participants.existsParticipant(sessionId, message.sender_id, function (err, res) { if (err) { ModelUtil.emitError(self.eventEmitter, "Check session paticipant failed: ", err); return; } if (res) { redis.hmgetAsync(sessionKey, ["type", "name"]).then(function (res) { let sessionType = res[0]; if (sessionType == null) { ModelUtil.emitError(self.eventEmitter, "Session with id " + sessionId + " not found."); return; } messages.saveMessageToRedis(sessionId, sessionType, messageId, message); Sessions.updateParticipantLastFetchTime(sessionId, message.sender_id, message.timestamp.getTime()); messages.saveMessageToMysql(sessionId, sessionType, messageId, message, function (err, res) { if (err) { ModelUtil.emitError(self.eventEmitter, {message: "Failed to save message to mysql: " + err}); } else { message.timestamp = message.timestamp.getTime(); ModelUtil.emitOK(self.eventEmitter, {count: 1, messages: [message]}); } }); }).then(function (res) { // 推送消息 ParticipantRepo.findIds(sessionId, function (err, res) { if (err) { ModelUtil.logError("Push message: get participant's id list failed: ", err); } else { message.session_id = sessionId; res.forEach(function (participant) { if (participant.id !== message.sender_id) { Sessions.pushNotification(participant.id, message); } }); } }) }).catch(function (err) { ModelUtil.emitError(self.eventEmitter, {message: "Error occurred while save message to session: " + err}); }) } else { ModelUtil.emitDataNotFound(self.eventEmitter, {message: "当前会话找不到此发送者"}); } }); } /** * 保存消息 * * @param message * @param sessionId * @param handler */ saveMessageByTopic(message, sessionId, handler) { let messages = new Messages(); let participants = new Participants(); let session_key = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId); let messageId = mongoose.Types.ObjectId().toString(); let sessionType = 0; let sessionName = ""; message.id = messageId; // 发送成员必须处于会话中 participants.existsParticipant(sessionId, message.sender_id, function (err, res) { if (res) { redis.hmgetAsync(session_key, ["type", "name"]).then(function (res) { sessionType = res[0]; sessionName = res[1]; if (!sessionType || !sessionName) { logger.error("session is error for key " + session_key); throw "session is not found"; } }).then(function (res) { // 更新消息存储 messages.saveMessageToRedis(sessionId, sessionType, messageId, message); messages.saveMessageToMysql(sessionId, sessionType, messageId, message); // 更新会话最新状态及成员最后一次消息获取时间 Sessions.updateParticipantLastFetchTime(sessionId, message.sender_id, message.timestamp.getTime()); Messages.updateLastContent(session_key, sessionType, sessionName, message); handler(null, messageId); }).then(function (res) { // TODO: 消息推送 }).catch(function (err) { handler(err, messageId) }) } else { handler("用户不在此会话当中!", messageId); } }) } /** * 置顶操作 */ stickSession(sessionId, user) { let user_session_key = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, user); let self = this; //取出最大的session redis.zrevrangeAsync(user_session_key, 0, 0).then(function (res) { //获取该session的时间搓 redis.zscoreAsync(user_session_key, res).then(function (scoreres) { let nowtime = new Date().getTime(); //当前时间搓比redis的时间搓更早证明没有置顶过 if (scoreres <= nowtime) { //初始化置顶 redis.zaddAsync(user_session_key, STICKY_SESSION_BASE_SCORE, sessionId).then(function (res) { logger.info("stickSession:" + sessionId + ",res:" + res); ModelUtil.emitOK(self.eventEmitter, {"status": 200, "msg": "置顶成功!"}); }).then(function () { SessionRepo.saveStickySession(sessionId, user, STICKY_SESSION_BASE_SCORE); }) } else { //已有置顶的数据,取出来加1保存回去 scoreres = Number(scoreres) + 1; redis.zaddAsync(user_session_key, scoreres, sessionId).then(function () { logger.info("stickSession:" + sessionId + ",res:" + res); ModelUtil.emitOK(self.eventEmitter, {"status": 200, "msg": "置顶成功!"}); }).then(function () { SessionRepo.saveStickySession(sessionId, user, scoreres); }) } }) }) } /** * 取消会话置顶 */ cancelStickSession(sessionId, user) { let user_session_key = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, user); let participants_key = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId); let self = this; redis.zscoreAsync(participants_key, user).then(function (res) { if (!res) { res = new Date().getTime(); } redis.zaddAsync(user_session_key, res, sessionId).then(function (res) { logger.info("cancelStickSession:" + sessionId); ModelUtil.emitOK(self.eventEmitter, {"status": 200, "msg": "取消置顶成功!"}); }).then(function () { SessionRepo.unstickSession(sessionId, user); }); }) } /** * 更新会话参与者的最后消息获取时间。 * * @param sessionId * @param userId */ static updateParticipantLastFetchTime(sessionId, userId, score) { let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId); redis.zaddAsync(participantsKey, score, userId) .then(function (res) { ParticipantRepo.updateLastFetchTime(new Date(score), sessionId, userId, function (err, res) { if (err) { logger.error("Update participant last fetch time failed: ", err); } }); }) .catch(function (err) { logger.error("Update participant last fetch time failed: ", err); }); } /** * 向用户推送通知,微信端用户直接推送消息,APP端通过个推发送通知消息。 * * @param targetUserId * @param message */ static pushNotification(targetUserId, message) { Users.isPatientId(targetUserId, function (err, isPatient) { if (isPatient) { WechatClient.sendMessage(targetUserId, message); } else { AppClient.sendNotification(targetUserId, message); } }); } } // Expose class module.exports = Sessions;