messages.js 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. /**
  2. * 消息模型。
  3. */
  4. "use strict";
  5. let MessageRepo = require('../../repository/mysql/message.repo');
  6. let RedisModel = require('./../redis.model.js');
  7. let RedisClient = require('../../repository/redis/redis.client.js');
  8. let ModelUtil = require('../../util/model.util');
  9. let redis = RedisClient.redisClient().connection;
  10. let log = require('../../util/log.js');
  11. let configFile = require('../../include/commons').CONFIG_FILE;
  12. let config = require('../../resources/config/' + configFile);
  13. var ObjectUtil = require("../../util/object.util.js");
  14. const REDIS_KEYS = require('../../include/commons').REDIS_KEYS;
  15. class Messages extends RedisModel {
  16. constructor() {
  17. super();
  18. }
  19. /**
  20. * 根据topicId获取对应的议题的信息列表
  21. * @param topicId
  22. */
  23. getMessagesByTopicId(topicId) {
  24. }
  25. /**
  26. * 分页
  27. * 根据topicId获取对应的议题的成员信息
  28. * @param topicId
  29. * @param page
  30. * @param pagesize
  31. */
  32. getMessagesByTopicIdForPage(topicId, page, pagesize) {
  33. }
  34. /**
  35. * 获取消息MySQL
  36. * @param sessionId
  37. * @param page
  38. * @param pagesize
  39. * @param handler
  40. */
  41. getMessageFromMySQL(sessionId, page, pagesize,messageType, handler) {
  42. MessageRepo.findBySessionId(sessionId, page, pagesize,messageType, handler);
  43. }
  44. /**
  45. * 获取消息MySQL
  46. * @param sessionId
  47. * @param page
  48. * @param pagesize
  49. * @param handler
  50. */
  51. getMessageByType(sessionId, page, pagesize,messageType) {
  52. let self = this;
  53. page = parseInt(page);
  54. pagesize = parseInt(pagesize);
  55. MessageRepo.findBySessionId(sessionId, page, pagesize,messageType,function(err,res){
  56. if(err){
  57. ModelUtil.emitError(self.eventEmitter, {message: "Error get message by session and type : " + err});
  58. }else{
  59. ModelUtil.emitOK(self.eventEmitter, {data:res});
  60. }
  61. });
  62. }
  63. /**
  64. * 获取单条消息
  65. *
  66. * @param messageId
  67. */
  68. getMessageById(messageId) {
  69. }
  70. /**
  71. * 将消息保存至REDIS,并更新会话的最后状态,清理会话的过期消息。
  72. *
  73. * PS:更新此处的消息缓存代码,记得更新Users.login()
  74. *
  75. * @param messageId
  76. * @param sessionId
  77. * @param sessionType
  78. * @param message
  79. */
  80. saveMessageToRedis(sessionId, sessionType, messageId, message) {
  81. let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
  82. let messageKey = RedisModel.makeRedisKey(REDIS_KEYS.Messages, sessionId);
  83. let messageTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  84. let msgJson = {
  85. id: message.id,
  86. sender_id: message.sender_id,
  87. sender_name: message.sender_name,
  88. timestamp: message.timestamp.getTime(),
  89. content_type: message.content_type,
  90. content: message.content
  91. };
  92. redis.multi()
  93. .hset(messageKey, messageId, JSON.stringify(msgJson)) // 保存消息
  94. .zadd(messageTimestampKey, message.timestamp.getTime(), messageId) // 保存消息时间
  95. .execAsync()
  96. .then(function (res) {
  97. Messages.updateLastContent(sessionKey, sessionType, null, message);
  98. Messages.cleanOutRangeMessage(sessionId); // clean out range messages
  99. });
  100. }
  101. /**
  102. * 保存Message 到mysql
  103. * @param message 消息对象
  104. * @param sessionType
  105. * @param messageId
  106. * @param sessionId
  107. * @type type 会话类型,1表示MUC会话,2表示P2P,3表示群会话,4表示临时讨论组
  108. */
  109. saveMessageToMysql(sessionId, sessionType, messageId, message, handler) {
  110. MessageRepo.save(message, sessionType, messageId, sessionId, handler);
  111. }
  112. updateLastContentToMysql(sessionKey, sessionType, name, message){
  113. }
  114. /**
  115. * 更新会话最后一条消息
  116. *
  117. * @param sessionKey rediskey
  118. * @param sessionType
  119. * @param name 议题名称
  120. * @param message
  121. * @returns {*}
  122. */
  123. static updateLastContent(sessionKey, sessionType, name, message,businessType) {
  124. redis.hmsetAsync(sessionKey,
  125. "create_date", message.timestamp.getTime(),
  126. "last_content", message.content,
  127. "last_content_type", message.content_type,
  128. "sender_id", message.sender_id,
  129. "sender_name", message.sender_name
  130. );
  131. let sessionid =sessionKey.replace("sessions:","");
  132. MessageRepo.updateLastContent(message.sender_id,message.sender_name,message.timestamp,message.content,message.content_type,sessionid);
  133. if(name != null) redis.hsetAsync(sessionKey, "name", name);
  134. if(sessionType != null) redis.hsetAsync(sessionKey, "type", sessionType);
  135. if(businessType != null) redis.hsetAsync(sessionKey, "business_type", businessType);
  136. }
  137. /**
  138. * 清理会话中超出范围的消息。
  139. *
  140. * 目前只实现超出1000条数据清理的逻辑,7天前的消息清除逻辑暂不实现。
  141. *
  142. * @param sessionId
  143. */
  144. static cleanOutRangeMessage(sessionId) {
  145. let maxMessageCacheCount = config.sessionConfig.maxMessageCount;
  146. let messageById = this.makeRedisKey(REDIS_KEYS.Messages, sessionId);
  147. let messagesByTimestampKey = this.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  148. redis.zcardAsync(messagesByTimestampKey).then(function (count) {
  149. if (count > maxMessageCacheCount) {
  150. redis.zrevrangeAsync(messagesByTimestampKey, 0, count - maxMessageCacheCount).then(function (idList) {
  151. redis.zremAsync(messagesByTimestampKey, idList).then(function (res) {
  152. redis.hdel(messageById, idList);
  153. })
  154. });
  155. }
  156. });
  157. }
  158. }
  159. // Expose class
  160. module.exports = Messages;