/** * 消息模型。 */ "use strict"; let MessageRepo = require('../../repository/mysql/message.repo'); let RedisModel = require('./../redis.model.js'); let SessionRepo = require('../../repository/mysql/session.repo'); let ParticipantRepo = require('../../repository/mysql/participant.repo'); let RedisClient = require('../../repository/redis/redis.client.js'); let ModelUtil = require('../../util/model.util'); var ObjectUtil = require("../../util/object.util.js"); let WechatClient = require("../client/wechat.client.js"); let AppClient = require("../client/app.client.js"); let mongoose = require('mongoose'); let redis = RedisClient.redisClient().connection; let log = require('../../util/log.js'); let configFile = require('../../include/commons').CONFIG_FILE; let config = require('../../resources/config/' + configFile); let logger = require('../../util/log'); let pubSub = require("../redis/pubSub.js"); const REDIS_KEYS = require('../../include/commons').REDIS_KEYS; class Messages extends RedisModel { constructor() { super(); } /** * 根据topicId获取对应的议题的信息列表 * @param topicId */ getMessagesByTopicId(topicId) { } /** * 分页 * 根据topicId获取对应的议题的成员信息 * @param topicId * @param page * @param pagesize */ getMessagesByTopicIdForPage(topicId, page, pagesize) { } /** * 获取消息MySQL * @param sessionId * @param page * @param pagesize * @param handler */ getMessageFromMySQL(sessionId, page, pagesize, messageType, handler) { MessageRepo.findBySessionId(sessionId, page, pagesize, messageType, handler); } /** * 获取消息MySQL * @param sessionId * @param page * @param pagesize * @param messageType */ getMessageByType(sessionId, page, pagesize, messageType) { let self = this; page = parseInt(page); pagesize = parseInt(pagesize); MessageRepo.findBySessionId(sessionId, page, pagesize, messageType, function (err, res) { if (err) { ModelUtil.emitError(self.eventEmitter, {message: "Get message by session and type failed: " + err}); } else { ModelUtil.emitOK(self.eventEmitter, {data: res}); } }); } /** * 获取单条消息 * * @param messageId */ getMessageById(messageId) { } /** * 将消息保存至REDIS,并更新会话的最后状态,清理会话的过期消息。 * * PS:更新此处的消息缓存代码,记得更新Users.login() * * @param messageId * @param sessionId * @param sessionType * @param message */ saveMessageToRedis(sessionId, sessionType, messageId, message) { let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId); let messageKey = RedisModel.makeRedisKey(REDIS_KEYS.Messages, sessionId); let messageTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId); let msgJson = { id: message.id, sender_id: message.sender_id, sender_name: message.sender_name, timestamp: message.timestamp.getTime(), content_type: message.content_type, content: message.content, business_type:message.business_type||1 }; redis.multi() .hset(messageKey, messageId, JSON.stringify(msgJson)) // 保存消息 .zadd(messageTimestampKey, message.timestamp.getTime(), messageId) // 保存消息时间 .execAsync() .then(function (res) { Messages.updateLastContent(sessionKey, sessionType, null, message); SessionRepo.updateSessionLastStatus(message.sender_id,message.sender_name,message.timestamp,message.content,message.content_type,sessionId); Messages.cleanOutRangeMessage(sessionId); // clean out range messages }) .catch(function (ex) { log.error("Save message to redis failed: ", ex); }); } /** *创建session发送session成功 * @param messageId * @param sessionId * @param sessionType * @param message */ saveMessageToRedisFromCreateSession(sessionId, messageId, message) { let messageKey = RedisModel.makeRedisKey(REDIS_KEYS.Messages, sessionId); let messageTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId); let msgJson = { id: message.id, sender_id: message.sender_id, sender_name: message.sender_name, timestamp: message.timestamp.getTime(), content_type: message.content_type, content: message.content, business_type:message.business_type||1 }; redis.multi() .hset(messageKey, messageId, JSON.stringify(msgJson)) // 保存消息 .zadd(messageTimestampKey, message.timestamp.getTime(), messageId) // 保存消息时间 .execAsync() .then(function (res) { Messages.cleanOutRangeMessage(sessionId); // clean out range messages }) .catch(function (ex) { log.error("Save message to redis failed: ", ex); }); } /** * 内外网通信。 * 消息内容 存系统表 * @param message */ sendMessage(message) { let self = this; let messages = new Messages(); let messageId = mongoose.Types.ObjectId().toString(); let sessionType =0; message.id = messageId; message.session_id = "system"; messages.saveMessageToMysql("system", sessionType, messageId, message, function (err, res) { if (err) { ModelUtil.emitError(self.eventEmitter, {message: "Failed to save message to mysql: " + err}); } else { message.timestamp = message.timestamp.getTime(); let targetUserId = message.sender_name; //告知医生新消息 WechatClient.sendSocketMessageToDoctor(targetUserId,message); if(config.environment!='local'){//pc版不推送个推,通过redis的publish AppClient.sendNotification(targetUserId, message,sessionType,1); } //外网pcim通过socket推送 WechatClient.sendPcImSocket(targetUserId,message,sessionType); //redis发布消息 if(config.pubSubSwitch) {//接收订阅消息处理开关,本地运行和测试库单独运行时防止用户接收消息2次 pubSub.publish(config.pubChannel,JSON.stringify(message)); } ModelUtil.emitOK(self.eventEmitter, {status: 200}); } }); } /** * 保存Message 到mysql * @param message 消息对象 * @param sessionType * @param messageId * @param sessionId * @param handler * @type type 会话类型,1表示MUC会话,2表示P2P,3表示群会话,4表示临时讨论组 */ saveMessageToMysql(sessionId, sessionType, messageId, message, handler) { MessageRepo.save(message, sessionType, messageId, sessionId, handler); } /** * 更新会话最后一条消息 * * @param sessionKey rediskey * @param sessionType * @param name 议题名称 * @param message * @param businessType * @returns {*} */ static updateLastContent(sessionKey, sessionType, name, message) { redis.hmsetAsync(sessionKey, "create_date", message.timestamp.getTime(), "last_content", message.content, "last_content_type", message.content_type, "last_sender_id", message.sender_id, "last_sender_name", message.sender_name ); /** * mysql操作分离 */ // let sessionid = sessionKey.replace("sessions:", ""); // SessionRepo.updateSessionLastStatus(message.sender_id, // message.sender_name, // message.timestamp, // message.content, // message.content_type, // sessionid, function (err, res) { // if (err) logger.error("Update session last status failed: ", err); // }); if (name != null) redis.hsetAsync(sessionKey, "name", name); if (sessionType != null) redis.hsetAsync(sessionKey, "type", sessionType); } /** * 清理会话中超出范围的消息。 * * 目前只实现超出1000条数据清理的逻辑,7天前的消息清除逻辑暂不实现。 * * @param sessionId */ static cleanOutRangeMessage(sessionId) { let maxMessageCacheCount = config.sessionConfig.maxMessageCount; let messageById = this.makeRedisKey(REDIS_KEYS.Messages, sessionId); let messagesByTimestampKey = this.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId); redis.zcardAsync(messagesByTimestampKey).then(function (count) { if (count > maxMessageCacheCount) { redis.zrevrangeAsync(messagesByTimestampKey, maxMessageCacheCount, count - maxMessageCacheCount).then(function (idList) { redis.zremAsync(messagesByTimestampKey, idList).then(function (res) { redis.hdel(messageById, idList); }) }); } }); } /** * * 清洗数据 * @param message */ dataMessage(sessionId) { let self = this; let messagesKey = RedisModel.makeRedisKey(REDIS_KEYS.Messages, sessionId); let message_timestamp_key = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId); MessageRepo.findBySessionId(sessionId, 0, config.sessionConfig.maxMessageCount, null, function (err, messages) { if (err) { ModelUtil.emitError(self.eventEmitter, {status: -1, message: "没有找到对应sessionId的聊天记录----"+sessionId}); return; } let multi = redis.multi(); if(messages){ messages.forEach(function (message) { let msgJson = { id: message.id, sender_id: message.sender_id, sender_name: message.sender_name, timestamp: ObjectUtil.timestampToLong(message.timestamp), content_type: message.content_type, content: message.content }; multi = multi.hset(messagesKey, message.id, JSON.stringify(msgJson)) .zadd(message_timestamp_key, ObjectUtil.timestampToLong(message.timestamp), message.id); }); } multi.execAsync() .then(function (res) { ModelUtil.emitOK(self.eventEmitter, {status:200,message:"存入redis成功!"}); }) .catch(function (ex) { log.error("Login failed while caching messages: ", ex); ModelUtil.emitOK(self.eventEmitter, res); return; }); }); } cleanMessageLastFetchTime(sessionId,userId) { let self = this; ParticipantRepo.findLastFetchTime(sessionId,userId,function(err,res){ if (err) { logger.error(err); return; } else { let last_fetch_time = new Date(res[0].last_fetch_time).getTime(); last_fetch_time = last_fetch_time + 1; let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId); redis.zaddAsync(participantsKey, last_fetch_time, userId) .then(function (res) { ModelUtil.emitOK(self.eventEmitter, {status:200,message:"存入redis成功!"}); }) .catch(function (err) { logger.error("Update participant last fetch time failed: ", err); ModelUtil.emitOK(self.eventEmitter, res); return; }); } }); } } // Expose class module.exports = Messages;