/** * 消息模型。 */ "use strict"; let MessageRepo = require('../../repository/mysql/message.repo'); let RedisModel = require('./../redis.model.js'); let SessionRepo = require('../../repository/mysql/session.repo'); let RedisClient = require('../../repository/redis/redis.client.js'); let ModelUtil = require('../../util/model.util'); var ObjectUtil = require("../../util/object.util.js"); let redis = RedisClient.redisClient().connection; let log = require('../../util/log.js'); let configFile = require('../../include/commons').CONFIG_FILE; let config = require('../../resources/config/' + configFile); let logger = require('../../util/log'); const REDIS_KEYS = require('../../include/commons').REDIS_KEYS; class Messages extends RedisModel { constructor() { super(); } /** * 根据topicId获取对应的议题的信息列表 * @param topicId */ getMessagesByTopicId(topicId) { } /** * 分页 * 根据topicId获取对应的议题的成员信息 * @param topicId * @param page * @param pagesize */ getMessagesByTopicIdForPage(topicId, page, pagesize) { } /** * 获取消息MySQL * @param sessionId * @param page * @param pagesize * @param handler */ getMessageFromMySQL(sessionId, page, pagesize, messageType, handler) { MessageRepo.findBySessionId(sessionId, page, pagesize, messageType, handler); } /** * 获取消息MySQL * @param sessionId * @param page * @param pagesize * @param handler */ getMessageByType(sessionId, page, pagesize, messageType) { let self = this; page = parseInt(page); pagesize = parseInt(pagesize); MessageRepo.findBySessionId(sessionId, page, pagesize, messageType, function (err, res) { if (err) { ModelUtil.emitError(self.eventEmitter, {message: "Error get message by session and type : " + err}); } else { ModelUtil.emitOK(self.eventEmitter, {data: res}); } }); } /** * 获取单条消息 * * @param messageId */ getMessageById(messageId) { } /** * 将消息保存至REDIS,并更新会话的最后状态,清理会话的过期消息。 * * PS:更新此处的消息缓存代码,记得更新Users.login() * * @param messageId * @param sessionId * @param sessionType * @param message */ saveMessageToRedis(sessionId, sessionType, messageId, message) { let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId); let messageKey = RedisModel.makeRedisKey(REDIS_KEYS.Messages, sessionId); let messageTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId); let msgJson = { id: message.id, sender_id: message.sender_id, sender_name: message.sender_name, timestamp: message.timestamp.getTime(), content_type: message.content_type, content: message.content }; redis.multi() .hset(messageKey, messageId, JSON.stringify(msgJson)) // 保存消息 .zadd(messageTimestampKey, message.timestamp.getTime(), messageId) // 保存消息时间 .execAsync() .then(function (res) { Messages.updateLastContent(sessionKey, sessionType, null, message); Messages.cleanOutRangeMessage(sessionId); // clean out range messages }); } /** * 保存Message 到mysql * @param message 消息对象 * @param sessionType * @param messageId * @param sessionId * @param handler * @type type 会话类型,1表示MUC会话,2表示P2P,3表示群会话,4表示临时讨论组 */ saveMessageToMysql(sessionId, sessionType, messageId, message, handler) { MessageRepo.save(message, sessionType, messageId, sessionId, handler); } /** * 更新会话最后一条消息 * * @param sessionKey rediskey * @param sessionType * @param name 议题名称 * @param message * @param businessType * @returns {*} */ static updateLastContent(sessionKey, sessionType, name, message, businessType) { redis.hmsetAsync(sessionKey, "create_date", message.timestamp.getTime(), "last_content", message.content, "last_content_type", message.content_type, "sender_id", message.sender_id, "sender_name", message.sender_name ); let sessionid = sessionKey.replace("sessions:", ""); SessionRepo.updateSessionLastStatus(message.sender_id, message.sender_name, message.timestamp, message.content, message.content_type, sessionid, function (err, res) { if (err) logger.error("Update session last status failed: ", err); }); if (name != null) redis.hsetAsync(sessionKey, "name", name); if (sessionType != null) redis.hsetAsync(sessionKey, "type", sessionType); if (businessType != null) redis.hsetAsync(sessionKey, "business_type", businessType); } /** * 清理会话中超出范围的消息。 * * 目前只实现超出1000条数据清理的逻辑,7天前的消息清除逻辑暂不实现。 * * @param sessionId */ static cleanOutRangeMessage(sessionId) { let maxMessageCacheCount = config.sessionConfig.maxMessageCount; let messageById = this.makeRedisKey(REDIS_KEYS.Messages, sessionId); let messagesByTimestampKey = this.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId); redis.zcardAsync(messagesByTimestampKey).then(function (count) { if (count > maxMessageCacheCount) { redis.zrevrangeAsync(messagesByTimestampKey, 0, count - maxMessageCacheCount).then(function (idList) { redis.zremAsync(messagesByTimestampKey, idList).then(function (res) { redis.hdel(messageById, idList); }) }); } }); } } // Expose class module.exports = Messages;