messages.js 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. /**
  2. * 消息模型。
  3. */
  4. "use strict";
  5. let MessageRepo = require('../../repository/mysql/message.repo');
  6. let RedisModel = require('./../redis.model.js');
  7. let RedisClient = require('../../repository/redis/redis.client.js');
  8. let Sessions = require('../sessions/sessions');
  9. let redis = RedisClient.redisClient().connection;
  10. let log = require('../../util/log.js');
  11. let configFile = require('../../include/commons').CONFIG_FILE;
  12. let config = require('../../resources/config/' + configFile);
  13. const REDIS_KEYS = require('../../include/commons').REDIS_KEYS;
  14. class Messages extends RedisModel {
  15. constructor() {
  16. super();
  17. }
  18. /**
  19. * 根据topicId获取对应的议题的信息列表
  20. * @param topicId
  21. */
  22. getMessagesByTopicId(topicId) {
  23. }
  24. /**
  25. * 分页
  26. * 根据topicId获取对应的议题的成员信息
  27. * @param topicId
  28. * @param page
  29. * @param pagesize
  30. */
  31. getMessagesByTopicIdForPage(topicId, page, pagesize) {
  32. }
  33. /**
  34. * 获取消息MySQL
  35. * @param sessionId
  36. * @param page
  37. * @param pagesize
  38. * @param handler
  39. */
  40. getMessageFromMySQL(sessionId, page, pagesize, handler) {
  41. MessageRepo.findBySessionId(sessionId, page, pagesize, handler);
  42. }
  43. /**
  44. * 获取单条消息
  45. *
  46. * @param messageId
  47. */
  48. getMessageById(messageId) {
  49. }
  50. /**
  51. * 将消息保存至REDIS,并更新会话的最后状态。也会清理会话的过期消息。
  52. *
  53. * @param messageId
  54. * @param sessionId
  55. * @param sessionType
  56. * @param message
  57. */
  58. saveMessageToRedis(sessionId, sessionType, messageId, message) {
  59. let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
  60. let messageKey = RedisModel.makeRedisKey(REDIS_KEYS.Messages, sessionId);
  61. let messageTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  62. redis.multi()
  63. .hset(messageKey, messageId, JSON.stringify(message))
  64. .zadd(messageTimestampKey, message.timestamp.getTime(), messageId)
  65. .execAsync()
  66. .then(function (res) {
  67. Messages.updateLastContent(sessionKey, sessionType, null, message);
  68. Messages.cleanOutRangeMessage(sessionId); // clean out range messages
  69. });
  70. }
  71. /**
  72. * 保存Message 到mysql
  73. * @param message 消息对象
  74. * @param sessionType
  75. * @param messageId
  76. * @param sessionId
  77. * @type type 会话类型,1表示MUC会话,2表示P2P,3表示群会话,4表示临时讨论组
  78. */
  79. saveMessageToMysql(sessionId, sessionType, messageId, message, handler) {
  80. MessageRepo.save(message, sessionType, messageId, sessionId, handler);
  81. }
  82. /**
  83. * 更新会话最后一条消息
  84. *
  85. * @param sessionKey rediskey
  86. * @param sessionType
  87. * @param name 议题名称
  88. * @param message
  89. * @returns {*}
  90. */
  91. static updateLastContent(sessionKey, sessionType, name, message) {
  92. redis.hmsetAsync(sessionKey,
  93. "create_date", message.timestamp,
  94. "last_content", message.content,
  95. "last_content_type", message.content_type,
  96. "sender_id", message.sender_id,
  97. "sender_name", message.sender_name
  98. );
  99. if(name != null) redis.hsetAsync(sessionKey, "name", name);
  100. if(sessionType != null) redis.hsetAsync(sessionKey, "type", sessionType);
  101. }
  102. /**
  103. * 清理会话中超出范围的消息。
  104. *
  105. * 目前只实现超出1000条数据清理的逻辑,7天前的消息清除逻辑暂不实现。
  106. *
  107. * @param sessionId
  108. */
  109. static cleanOutRangeMessage(sessionId) {
  110. let maxMessageCacheCount = config.sessionConfig.maxMessageCount;
  111. let messageById = this.makeRedisKey(REDIS_KEYS.Messages, sessionId);
  112. let messagesByTimestampKey = this.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  113. redis.zcardAsync(messagesByTimestampKey).then(function (count) {
  114. if (count > maxMessageCacheCount) {
  115. redis.zrevrangeAsync(messagesByTimestampKey, 0, count - maxMessageCacheCount).then(function (idList) {
  116. redis.zremAsync(messagesByTimestampKey, idList).then(function (res) {
  117. redis.hdel(messageById, idList);
  118. })
  119. });
  120. }
  121. });
  122. }
  123. }
  124. // Expose class
  125. module.exports = Messages;