sessions.js 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450
  1. /**
  2. * 会话模型。
  3. */
  4. "use strict";
  5. let RedisClient = require('../../repository/redis/redis.client.js');
  6. let RedisModel = require('./../redis.model.js');
  7. let ModelUtil = require('../../util/model.util');
  8. let Messages = require('../messages/messages');
  9. let Participants = require('./Participants');
  10. let SessionRepo = require('../../repository/mysql/session.repo');
  11. let ParticipantRepo = require('../../repository/mysql/participant.repo');
  12. let configFile = require('../../include/commons').CONFIG_FILE;
  13. let config = require('../../resources/config/' + configFile);
  14. let redis = RedisClient.redisClient().connection;
  15. let logger = require('../../util/log.js');
  16. let mongoose = require('mongoose');
  17. const REDIS_KEYS = require('../../include/commons').REDIS_KEYS;
  18. const SESSION_TYPES = require('../../include/commons').SESSION_TYPES;
  19. const STICKY_SESSION_BASE_SCORE = require('../../include/commons').STICKY_SESSION_BASE_SCORE;
  20. class Sessions extends RedisModel {
  21. constructor() {
  22. super();
  23. }
  24. /**
  25. * 创建会话。会话的ID来源:
  26. * MUC:患者的ID
  27. * P2P:对成员的ID排序后,取hash值
  28. * GROUP:团队的ID
  29. *
  30. * @param sessionId
  31. * @param name 会话名称
  32. * @param type 会话类型
  33. * @param participantArray 会话成员
  34. * @param handler 回调,仅MUC模式使用
  35. */
  36. createSession(sessionId, name, type, participantArray, handler) {
  37. let self = this;
  38. if (type == SESSION_TYPES.P2P) {
  39. var participantIdArray = [];
  40. for (let i in participantArray) {
  41. participantIdArray.push(participantArray[i].split(":")[0]);
  42. }
  43. if (participantIdArray.length != 2) {
  44. ModelUtil.emitDataNotFound(self.eventEmitter, {message: "P2P session only allow 2 participants."});
  45. return false;
  46. }
  47. ParticipantRepo.findSessionIdByParticipantIds(participantIdArray[0], participantIdArray[1], function (err, res) {
  48. sessionId = res;
  49. callCreate(sessionId);
  50. });
  51. } else {
  52. callCreate(sessionId);
  53. }
  54. function callCreate(sessionId) {
  55. SessionRepo.findOne(sessionId, function (err, res) {
  56. if (res.length > 0) {
  57. let session = res[0];
  58. ModelUtil.emitOK(self.eventEmitter, {
  59. id: session.id,
  60. name: session.name,
  61. type: session.type,
  62. create_date: session.create_date
  63. });
  64. return;
  65. }
  66. let createDate = new Date();
  67. let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
  68. // 保存会话及成员至MySQL中
  69. self.saveSessionToMysql(sessionId, name, type, createDate, function (err, res) {
  70. Participants.saveParticipantsToMysql(sessionId, participantArray, function (err, res) {
  71. // 保存会话及成员至Redis中,并更新会话的最后状态
  72. let isMucSession = SESSION_TYPES.MUC == type;
  73. let message = {
  74. sender_id: "System",
  75. sender_name: "System",
  76. content_type: 1,
  77. content: "",
  78. timestamp: createDate
  79. };
  80. Messages.updateLastContent(sessionKey, type, name, message);
  81. Participants.saveParticipantsToRedis(sessionId, participantArray, createDate, function (res) {
  82. if (isMucSession) {
  83. handler(true, sessionId);
  84. } else {
  85. ModelUtil.emitOK(self.eventEmitter, {id: sessionId});
  86. }
  87. });
  88. });
  89. });
  90. });
  91. }
  92. }
  93. /**
  94. * 保存session到MySQL
  95. * @param sessionId
  96. * @param name
  97. * @param type
  98. * @param createDate
  99. * @param handler
  100. */
  101. saveSessionToMysql(sessionId, name, type, createDate, handler) {
  102. SessionRepo.saveSession(sessionId, name, type, createDate, handler);
  103. }
  104. /**
  105. * 获取某个用户的全部session列表
  106. * @param userId
  107. * @param handler
  108. */
  109. getUserSessionsFromMysql(userId, handler) {
  110. SessionRepo.findAll(userId, handler);
  111. }
  112. /**
  113. * 获取session单个对象
  114. * @param sessionId
  115. * @param handler
  116. */
  117. getSessions(sessionId, handler) {
  118. SessionRepo.findOne(sessionId, handler);
  119. }
  120. /**
  121. * 根据用户ID获取用户的session列表
  122. * @param userId
  123. * @param page
  124. * @param size
  125. */
  126. getUserSessions(userId, page, size) {
  127. let userSessionKey = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, userId);
  128. let self = this;
  129. if (page > 0) {
  130. page = page * size;
  131. size = size + page;
  132. }
  133. // 倒序获取
  134. redis.zrevrangeAsync(userSessionKey, page, size).then(function (res) {
  135. let sessionList = [];
  136. if (res.length == 0) {
  137. ModelUtil.emitOK(self.eventEmitter, []);
  138. return;
  139. }
  140. for (let i in res) {
  141. callGetSessions(res[i], i == res.length - 1);
  142. }
  143. function callGetSessions(sessionId, lastOne) {
  144. let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
  145. redis.hgetallAsync(sessionKey).then(function (session) {
  146. let sessionParticipantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  147. // 对比当前用户最后一次此会话消息的时间与会话中最新的消息时间,以此判断未读消息数量
  148. redis.zscoreAsync(sessionParticipantsKey, userId).then(function (lastFetchTime) {
  149. callGetUnreadCount(session, sessionId, lastFetchTime, lastOne);
  150. })
  151. }).catch(function (err) {
  152. throw err;
  153. })
  154. }
  155. /**
  156. * 统计未读消息数。以当前时间为准。
  157. *
  158. * @param session 返回的会话列表
  159. * @param sessionId 当前会话ID
  160. * @param lastFetchTime 当前会话当前用户的最后一次时间搓
  161. * @param lastOne
  162. */
  163. function callGetUnreadCount(session, sessionId, lastFetchTime, lastOne) {
  164. let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  165. redis.zrangebyscoreAsync(messagesByTimestampKey, lastFetchTime, (new Date().getTime()))
  166. .then(function (messagetimelist) {
  167. session.id = sessionId;
  168. session.unread_count = messagetimelist.length;
  169. callGetMyRole(session, sessionId, lastOne);
  170. })
  171. .catch(function (err) {
  172. throw err;
  173. });
  174. }
  175. /**
  176. * 获取用户在此会话中的角色。
  177. *
  178. * @param session 要返回的JSON
  179. * @param sessionId
  180. * @param lastOne
  181. */
  182. function callGetMyRole(session, sessionId, lastOne) {
  183. let participantsRoleKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipantsRole, sessionId);
  184. redis.hgetAsync(participantsRoleKey, userId).then(function (role) {
  185. session.my_role = role;
  186. callback(session, lastOne);
  187. })
  188. }
  189. /**
  190. * 列表封装完毕后由此回调返回数据界面
  191. *
  192. * @param session
  193. * @param lastOne
  194. */
  195. function callback(session, lastOne) {
  196. sessionList.push(session);
  197. if (lastOne) {
  198. ModelUtil.emitOK(self.eventEmitter, sessionList);
  199. }
  200. }
  201. }).catch(function (err) {
  202. ModelUtil.emitError(self.eventEmitter, {message: "Get sessions failed: " + err});
  203. })
  204. }
  205. /**
  206. * 根据会话中的消息
  207. *
  208. * @param sessionId 会话ID
  209. * @param user 拉取消息的人
  210. * @param page 第几页
  211. * @param pagesize 分页数量
  212. */
  213. getMessages(sessionId, user, stratmsgid,endmsgid ) {
  214. let self = this;
  215. let message_timestamp_key = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  216. let message_key = RedisModel.makeRedisKey(REDIS_KEYS.Messages, sessionId);
  217. let participants_key = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  218. //超过最大限制后从mysql获取数据
  219. // if (page * pagesize >= config.sessionConfig.maxMessageCount) {
  220. // self.getMessageFromMySQL(sessionId, page, pagesize, function (err, res) {
  221. // if (!err) {
  222. // ModelUtil.emitOK(self.eventEmitter, {"status": 200, "data": res});
  223. // } else {
  224. // ModelUtil.emitOK(self.eventEmitter, {"status": -1, "data": err});
  225. // }
  226. // })
  227. // } else {
  228. // if (page > 0) {
  229. // page = page * pagesize;
  230. // pagesize = pagesize + page;
  231. // }
  232. let participants = new Participants();
  233. participants.existsParticipant(sessionId, user, function (res) {
  234. if (!res) {
  235. ModelUtil.emitOK(self.eventEmitter, {"status": -1, "msg": "用户不在此会话中!"});
  236. } else {
  237. //倒序取出最后N条消息
  238. redis.zrevrangebyscoreAsync(message_timestamp_key, endmsgid, stratmsgid).then(function (res) {
  239. //取出消息实体
  240. if (res.length == 0) {
  241. ModelUtil.emitOK(self.eventEmitter, {"status": 200, "data": []});
  242. return;
  243. }
  244. redis.hmgetAsync(message_key, res).then(function (messages) {
  245. console.log(messages)
  246. //将取到的消息返回给前端
  247. ModelUtil.emitOK(self.eventEmitter, {"status": 200, "data": messages});
  248. }).then(function () {
  249. //更新患者最后一次获取消息的日期
  250. redis.zaddAsync(participants_key, (new Date().getTime()), user).then(function (res) {
  251. console.log(res);
  252. }).catch(function (res) {
  253. throw res;
  254. })
  255. })
  256. }).catch(function (res) {
  257. ModelUtil.emitOK(self.eventEmitter, {"status": -1, "msg": res});
  258. })
  259. }
  260. })
  261. // }
  262. }
  263. getAllSessionsUnreadMessageCount(){}
  264. /**
  265. * 获取会话的未读消息数。
  266. *
  267. * @param sessionId
  268. * @param userId
  269. */
  270. getSessionUnreadMessageCount(sessionId, userId){
  271. }
  272. /**
  273. * 保存消息。
  274. *
  275. * 也可以根据议题保存消息,但最终还是保存到与会话对象。
  276. *
  277. * see also: saveMessageByTopic
  278. *
  279. * @param message
  280. * @param sessionId
  281. */
  282. saveMessageBySession(sessionId, message) {
  283. let self = this;
  284. let messages = new Messages();
  285. let participants = new Participants();
  286. let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
  287. let messageId = mongoose.Types.ObjectId().toString();
  288. // 检查会话中是否存在此成员
  289. participants.existsParticipant(sessionId, message.sender_id, function (err, res) {
  290. if (err) {
  291. ModelUtil.emitError(self.eventEmitter, "Check session paticipant failed: ", err);
  292. return;
  293. }
  294. if (res) {
  295. redis.hmgetAsync(sessionKey, ["type", "name"]).then(function (res) {
  296. let sessionType = res[0];
  297. messages.saveMessageToRedis(sessionId, sessionType, messageId, message);
  298. messages.saveMessageToMysql(sessionId, sessionType, messageId, message, function (err, res) {
  299. if (err) {
  300. ModelUtil.emitError(self.eventEmitter, {message: "Failed to save message to mysql: " + err});
  301. } else {
  302. ModelUtil.emitOK(self.eventEmitter, {count: 1, messages: [message]});
  303. }
  304. });
  305. }).then(function (res) {
  306. // TODO: 消息推送
  307. }).catch(function (err) {
  308. ModelUtil.emitError(self.eventEmitter, {message: "Error occurred while save message to session: " + err});
  309. })
  310. } else {
  311. ModelUtil.emitDataNotFound(self.eventEmitter, {message: "当前会话找不到此发送者"});
  312. }
  313. });
  314. }
  315. /**
  316. * 保存消息
  317. *
  318. * @param message
  319. * @param sessionId
  320. */
  321. saveMessageByTopic(message, sessionId, handler) {
  322. let self = this;
  323. let messages = new Messages();
  324. let participants = new Participants();
  325. let session_key = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
  326. let messageId = mongoose.Types.ObjectId().toString();
  327. let sessionType = 0;
  328. let name = "";
  329. participants.existsParticipant(sessionId, message.senderId, function (err, res) {
  330. //校验发送成员是都在讨论组
  331. if (res) {
  332. redis.hmgetAsync(session_key, ["type", "name"]).then(function (res) {
  333. sessionType = res[0];
  334. name = res[1];
  335. if (!sessionType || !name) {
  336. logger.error("session is error for key " + session_key);
  337. throw "session is not found";
  338. }
  339. }).then(function (res) {
  340. //更新消息相关
  341. return messages.saveMessageForRedis(messageId, sessionId, message);
  342. }).then(function (res) {
  343. //更新session的最后一条聊天记录
  344. return Messages.updateLastContent(session_key, sessionType, name, message);
  345. }).then(function (res) {
  346. //操作mysql数据库
  347. messages.saveMessageToMysql(sessionId, sessionType, messageId, message);
  348. //返回数据给前端。
  349. handler(null, messageId)
  350. //消息推送
  351. }).catch(function (res) {
  352. handler(res, messageId)
  353. })
  354. } else {
  355. handler("用户不在此会话当中!", messageId);
  356. }
  357. })
  358. }
  359. /**
  360. * 置顶操作
  361. */
  362. stickSession(sessionId, user) {
  363. let user_session_key = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, user);
  364. let self = this;
  365. //取出最大的session
  366. redis.zrevrangeAsync(user_session_key, 0, 0).then(function (res) {
  367. //获取该session的时间搓
  368. redis.zscoreAsync(user_session_key, res).then(function (scoreres) {
  369. let nowtime = new Date().getTime();
  370. //当前时间搓比redis的时间搓更早证明没有置顶过
  371. if (scoreres <= nowtime) {
  372. //初始化置顶
  373. redis.zaddAsync(user_session_key, STICKY_SESSION_BASE_SCORE, sessionId).then(function (res) {
  374. logger.info("stickSession:" + sessionId + ",res:" + res);
  375. ModelUtil.emitOK(self.eventEmitter, {"status": 200, "msg": "置顶成功!"});
  376. }).then(function () {
  377. SessionRepo.saveStickySession(sessionId, user, STICKY_SESSION_BASE_SCORE);
  378. })
  379. } else {
  380. //已有置顶的数据,取出来加1保存回去
  381. scoreres = Number(scoreres) + 1;
  382. redis.zaddAsync(user_session_key, scoreres, sessionId).then(function () {
  383. logger.info("stickSession:" + sessionId + ",res:" + res);
  384. ModelUtil.emitOK(self.eventEmitter, {"status": 200, "msg": "置顶成功!"});
  385. }).then(function () {
  386. SessionRepo.saveStickySession(sessionId, user, scoreres);
  387. })
  388. }
  389. })
  390. })
  391. }
  392. /**
  393. * 取消会话置顶
  394. */
  395. cancelStickSession(sessionId, user) {
  396. let user_session_key = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, user);
  397. let participants_key = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  398. let self = this;
  399. redis.zscoreAsync(participants_key, user).then(function (res) {
  400. if (!res) {
  401. res = new Date().getTime();
  402. }
  403. redis.zaddAsync(user_session_key, res, sessionId).then(function (res) {
  404. logger.info("cancelStickSession:" + sessionId);
  405. ModelUtil.emitOK(self.eventEmitter, {"status": 200, "msg": "取消置顶成功!"});
  406. }).then(function () {
  407. SessionRepo.unstickSession(sessionId, user);
  408. });
  409. })
  410. }
  411. }
  412. // Expose class
  413. module.exports = Sessions;