messages.js 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. /**
  2. * 消息模型。
  3. */
  4. "use strict";
  5. let MessageRepo = require('../../repository/mysql/message.repo');
  6. let RedisModel = require('./../redis.model.js');
  7. let SessionRepo = require('../../repository/mysql/session.repo');
  8. let RedisClient = require('../../repository/redis/redis.client.js');
  9. let ModelUtil = require('../../util/model.util');
  10. var ObjectUtil = require("../../util/object.util.js");
  11. let redis = RedisClient.redisClient().connection;
  12. let log = require('../../util/log.js');
  13. let configFile = require('../../include/commons').CONFIG_FILE;
  14. let config = require('../../resources/config/' + configFile);
  15. let logger = require('../../util/log');
  16. const REDIS_KEYS = require('../../include/commons').REDIS_KEYS;
  17. class Messages extends RedisModel {
  18. constructor() {
  19. super();
  20. }
  21. /**
  22. * 根据topicId获取对应的议题的信息列表
  23. * @param topicId
  24. */
  25. getMessagesByTopicId(topicId) {
  26. }
  27. /**
  28. * 分页
  29. * 根据topicId获取对应的议题的成员信息
  30. * @param topicId
  31. * @param page
  32. * @param pagesize
  33. */
  34. getMessagesByTopicIdForPage(topicId, page, pagesize) {
  35. }
  36. /**
  37. * 获取消息MySQL
  38. * @param sessionId
  39. * @param page
  40. * @param pagesize
  41. * @param handler
  42. */
  43. getMessageFromMySQL(sessionId, page, pagesize, messageType, handler) {
  44. MessageRepo.findBySessionId(sessionId, page, pagesize, messageType, handler);
  45. }
  46. /**
  47. * 获取消息MySQL
  48. * @param sessionId
  49. * @param page
  50. * @param pagesize
  51. * @param messageType
  52. */
  53. getMessageByType(sessionId, page, pagesize, messageType) {
  54. let self = this;
  55. page = parseInt(page);
  56. pagesize = parseInt(pagesize);
  57. MessageRepo.findBySessionId(sessionId, page, pagesize, messageType, function (err, res) {
  58. if (err) {
  59. ModelUtil.emitError(self.eventEmitter, {message: "Get message by session and type failed: " + err});
  60. } else {
  61. ModelUtil.emitOK(self.eventEmitter, {data: res});
  62. }
  63. });
  64. }
  65. /**
  66. * 获取单条消息
  67. *
  68. * @param messageId
  69. */
  70. getMessageById(messageId) {
  71. }
  72. /**
  73. * 将消息保存至REDIS,并更新会话的最后状态,清理会话的过期消息。
  74. *
  75. * PS:更新此处的消息缓存代码,记得更新Users.login()
  76. *
  77. * @param messageId
  78. * @param sessionId
  79. * @param sessionType
  80. * @param message
  81. */
  82. saveMessageToRedis(sessionId, sessionType, messageId, message) {
  83. let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
  84. let messageKey = RedisModel.makeRedisKey(REDIS_KEYS.Messages, sessionId);
  85. let messageTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  86. let msgJson = {
  87. id: message.id,
  88. sender_id: message.sender_id,
  89. sender_name: message.sender_name,
  90. timestamp: message.timestamp.getTime(),
  91. content_type: message.content_type,
  92. content: message.content,
  93. business_type:message.business_type||1
  94. };
  95. redis.multi()
  96. .hset(messageKey, messageId, JSON.stringify(msgJson)) // 保存消息
  97. .zadd(messageTimestampKey, message.timestamp.getTime(), messageId) // 保存消息时间
  98. .execAsync()
  99. .then(function (res) {
  100. Messages.updateLastContent(sessionKey, sessionType, null, message);
  101. SessionRepo.updateSessionLastStatus(message.sender_id,message.sender_name,message.timestamp,message.content,message.content_type,sessionId);
  102. Messages.cleanOutRangeMessage(sessionId); // clean out range messages
  103. })
  104. .catch(function (ex) {
  105. log.error("Save message to redis failed: ", ex);
  106. });
  107. }
  108. /**
  109. *创建session发送session成功
  110. * @param messageId
  111. * @param sessionId
  112. * @param sessionType
  113. * @param message
  114. */
  115. saveMessageToRedisFromCreateSession(sessionId, messageId, message) {
  116. let messageKey = RedisModel.makeRedisKey(REDIS_KEYS.Messages, sessionId);
  117. let messageTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  118. let msgJson = {
  119. id: message.id,
  120. sender_id: message.sender_id,
  121. sender_name: message.sender_name,
  122. timestamp: message.timestamp.getTime(),
  123. content_type: message.content_type,
  124. content: message.content,
  125. business_type:message.business_type||1
  126. };
  127. redis.multi()
  128. .hset(messageKey, messageId, JSON.stringify(msgJson)) // 保存消息
  129. .zadd(messageTimestampKey, message.timestamp.getTime(), messageId) // 保存消息时间
  130. .execAsync()
  131. .then(function (res) {
  132. Messages.cleanOutRangeMessage(sessionId); // clean out range messages
  133. })
  134. .catch(function (ex) {
  135. log.error("Save message to redis failed: ", ex);
  136. });
  137. }
  138. /**
  139. * 保存Message 到mysql
  140. * @param message 消息对象
  141. * @param sessionType
  142. * @param messageId
  143. * @param sessionId
  144. * @param handler
  145. * @type type 会话类型,1表示MUC会话,2表示P2P,3表示群会话,4表示临时讨论组
  146. */
  147. saveMessageToMysql(sessionId, sessionType, messageId, message, handler) {
  148. MessageRepo.save(message, sessionType, messageId, sessionId, handler);
  149. }
  150. /**
  151. * 更新会话最后一条消息
  152. *
  153. * @param sessionKey rediskey
  154. * @param sessionType
  155. * @param name 议题名称
  156. * @param message
  157. * @param businessType
  158. * @returns {*}
  159. */
  160. static updateLastContent(sessionKey, sessionType, name, message) {
  161. redis.hmsetAsync(sessionKey,
  162. "create_date", message.timestamp.getTime(),
  163. "last_content", message.content,
  164. "last_content_type", message.content_type,
  165. "last_sender_id", message.sender_id,
  166. "last_sender_name", message.sender_name
  167. );
  168. /**
  169. * mysql操作分离
  170. */
  171. // let sessionid = sessionKey.replace("sessions:", "");
  172. // SessionRepo.updateSessionLastStatus(message.sender_id,
  173. // message.sender_name,
  174. // message.timestamp,
  175. // message.content,
  176. // message.content_type,
  177. // sessionid, function (err, res) {
  178. // if (err) logger.error("Update session last status failed: ", err);
  179. // });
  180. if (name != null) redis.hsetAsync(sessionKey, "name", name);
  181. if (sessionType != null) redis.hsetAsync(sessionKey, "type", sessionType);
  182. }
  183. /**
  184. * 清理会话中超出范围的消息。
  185. *
  186. * 目前只实现超出1000条数据清理的逻辑,7天前的消息清除逻辑暂不实现。
  187. *
  188. * @param sessionId
  189. */
  190. static cleanOutRangeMessage(sessionId) {
  191. let maxMessageCacheCount = config.sessionConfig.maxMessageCount;
  192. let messageById = this.makeRedisKey(REDIS_KEYS.Messages, sessionId);
  193. let messagesByTimestampKey = this.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  194. redis.zcardAsync(messagesByTimestampKey).then(function (count) {
  195. if (count > maxMessageCacheCount) {
  196. redis.zrevrangeAsync(messagesByTimestampKey, 0, count - maxMessageCacheCount).then(function (idList) {
  197. redis.zremAsync(messagesByTimestampKey, idList).then(function (res) {
  198. redis.hdel(messageById, idList);
  199. })
  200. });
  201. }
  202. });
  203. }
  204. }
  205. // Expose class
  206. module.exports = Messages;