/** * 消息模型。 */ "use strict"; let MessageRepo = require('../../repository/mysql/message.repo'); let RedisModel = require('./../redis.model.js'); let RedisClient = require('../../repository/redis/redis.client.js'); let Sessions = require('../sessions/sessions'); let redis = RedisClient.redisClient().connection; let log = require('../../util/log.js'); let configFile = require('../../include/commons').CONFIG_FILE; let config = require('../../resources/config/' + configFile); const REDIS_KEYS = require('../../include/commons').REDIS_KEYS; class Messages extends RedisModel { constructor() { super(); } /** * 根据topicId获取对应的议题的信息列表 * @param topicId */ getMessagesByTopicId(topicId) { } /** * 分页 * 根据topicId获取对应的议题的成员信息 * @param topicId * @param page * @param pagesize */ getMessagesByTopicIdForPage(topicId, page, pagesize) { } /** * 获取消息MySQL * @param sessionId * @param page * @param pagesize * @param handler */ getMessageFromMySQL(sessionId, page, pagesize, handler) { MessageRepo.findBySessionId(sessionId, page, pagesize, handler); } /** * 获取单条消息 * * @param messageId */ getMessageById(messageId) { } /** * 将消息保存至REDIS,并更新会话的最后状态。也会清理会话的过期消息。 * * @param messageId * @param sessionId * @param sessionType * @param message */ saveMessageToRedis(sessionId, sessionType, messageId, message) { let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId); let messageKey = RedisModel.makeRedisKey(REDIS_KEYS.Messages, sessionId); let messageTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId); redis.multi() .hset(messageKey, messageId, JSON.stringify(message)) .zadd(messageTimestampKey, message.timestamp.getTime(), messageId) .execAsync() .then(function (res) { Messages.updateLastContent(sessionKey, sessionType, null, message); Messages.cleanOutRangeMessage(sessionId); // clean out range messages }); } /** * 保存Message 到mysql * @param message 消息对象 * @param sessionType * @param messageId * @param sessionId * @type type 会话类型,1表示MUC会话,2表示P2P,3表示群会话,4表示临时讨论组 */ saveMessageToMysql(sessionId, sessionType, messageId, message, handler) { MessageRepo.save(message, sessionType, messageId, sessionId, handler); } /** * 更新会话最后一条消息 * * @param sessionKey rediskey * @param sessionType * @param name 议题名称 * @param message * @returns {*} */ static updateLastContent(sessionKey, sessionType, name, message) { redis.hmsetAsync(sessionKey, "create_date", message.timestamp, "last_content", message.content, "last_content_type", message.content_type, "sender_id", message.sender_id, "sender_name", message.sender_name ); if(name != null) redis.hsetAsync(sessionKey, "name", name); if(sessionType != null) redis.hsetAsync(sessionKey, "type", sessionType); } /** * 清理会话中超出范围的消息。 * * 目前只实现超出1000条数据清理的逻辑,7天前的消息清除逻辑暂不实现。 * * @param sessionId */ static cleanOutRangeMessage(sessionId) { let maxMessageCacheCount = config.sessionConfig.maxMessageCount; let messageById = this.makeRedisKey(REDIS_KEYS.Messages, sessionId); let messagesByTimestampKey = this.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId); redis.zcardAsync(messagesByTimestampKey).then(function (count) { if (count > maxMessageCacheCount) { redis.zrevrangeAsync(messagesByTimestampKey, 0, count - maxMessageCacheCount).then(function (idList) { redis.zremAsync(messagesByTimestampKey, idList).then(function (res) { redis.hdel(messageById, idList); }) }); } }); } } // Expose class module.exports = Messages;