sessions.js 14 KB


  1. /**
  2. * 会话模型。
  3. */
  4. "use strict";
  5. let RedisClient = require('../../repository/redis/redis.client.js');
  6. let redisClient = RedisClient.redisClient();
  7. let redis = redisClient.connection;
  8. let RedisModel = require('./../redis.model.js');
  9. let modelUtil = require('../../util/model.util');
  10. let Messages = require('../messages/messages');
  11. let Participants = require('./Participants');
  12. let log = require('../../util/log.js');
  13. const RedisKeys = require('../../include/commons').REDIS_KEYS;
  14. const Commons = require('../../include/commons');
  15. let configFile = require('../../include/commons').CONFIG_FILE;
  16. let config = require('../../resources/config/' + configFile);
  17. let SessionRepo = require('../../repository/mysql/session.repo');
  18. let mongoose = require('mongoose');
  19. class Sessions extends RedisModel {
  20. constructor() {
  21. super();
  22. }
  23. /**
  24. * 创建会话
  25. *
  26. * @param sessionId 会话ID
  27. * @param name 会话名称
  28. * @param type 会话类型
  29. * @param users 会话成员
  30. */
  31. createSession(sessionId, name, type, users) {
  32. if (type == 2) {
  33. }
  34. let self = this;
  35. let createDate = new Date();
  36. let session_key = super.makeRedisKey(RedisKeys.Session, sessionId);
  37. let participants = new Participants();
  38. // 将session加入redis
  39. participants.saveParticipantsToRedis(sessionId, users.split(","), createDate, function (res) {
  40. if (!res) {
  41. modelUtil.emitData(self.eventEmitter, {"status": -1, "msg": res});
  42. } else {
  43. let messages = {};
  44. messages.senderId = "system";
  45. messages.senderName = "系统消息";
  46. messages.timestamp = createDate;
  47. messages.content = "";
  48. messages.contentType = "1";
  49. self.updateLastContent(session_key, type, name, messages);
  50. modelUtil.emitData(self.eventEmitter, {"status": 200, "msg": "session create success!"});
  51. self.saveSessionToMysql(sessionId, name, type, createDate);
  52. participants.saveParticipantsToMysql(sessionId, users.split(",")); //创建session成员到数据库
  53. }
  54. })
  55. }
  56. /**
  57. * 保存session到MySQL
  58. * @param sessionId
  59. * @param name
  60. * @param type
  61. * @param createDate
  62. */
  63. saveSessionToMysql(sessionId, name, type, createDate) {
  64. SessionRepo.saveSession(sessionId, name, type, createDate);
  65. }
  66. /**
  67. * 获取某个用户的全部session列表
  68. * @param userId
  69. * @param handler
  70. */
  71. getUserSessionsFromMysql(userId, handler) {
  72. SessionRepo.findAll(userId, handler);
  73. }
  74. /**
  75. * 获取session单个对象
  76. * @param sessionId
  77. * @param handler
  78. */
  79. getSessions(sessionId, handler) {
  80. SessionRepo.findOne(sessionId, handler);
  81. }
  82. /**
  83. * 根据用户ID获取用户的session列表
  84. * @param userId
  85. * @param page
  86. * @param pagesize
  87. */
  88. getUserSessions(userId, page, pagesize) {
  89. let user_session_key = super.makeRedisKey(RedisKeys.UserSessions, userId);
  90. let self = this;
  91. let _super = super.makeRedisKey;
  92. if (page > 0) {
  93. page = page * pagesize;
  94. pagesize = pagesize + page;
  95. }
  96. //倒序
  97. redis.zrevrangeAsync(user_session_key, page, pagesize).then(function (res) {
  98. let sessionlist = [];
  99. if (res.length == 0) {
  100. modelUtil.emitData(self.eventEmitter, {"status": 200, "data": res});
  101. } else {
  102. for (var j in res) {
  103. calllist(res[j], j, res.length);
  104. }
  105. }
  106. function calllist(session, j, _len) {
  107. let session_key = _super(RedisKeys.Session, session);
  108. redis.hgetallAsync(session_key).then(function (res) {
  109. let participants_key = _super(RedisKeys.Participants, session);
  110. //当前用户最后一次登录改讨论组时间
  111. redis.zscoreAsync(participants_key, userId).then(function (restimestamp) {
  112. //时间差获取消息数量
  113. callamount(res, j, _len, session, restimestamp);
  114. })
  115. }).catch(function (err) {
  116. throw err;
  117. })
  118. }
  119. /**
  120. * 消息统计
  121. * @param res 返回的会话列表
  122. * @param j 当前会话列表的位置
  123. * @param _len 列表长度 用做返回前端操作
  124. * @param session 当前会话
  125. * @param restimestamp 当前会话当前用户的最后一次时间搓
  126. */
  127. function callamount(res, j, _len, session, restimestamp) {
  128. let message_time_key = _super(RedisKeys.MessagesByTimestamp, session);
  129. redis.zrangebyscoreAsync(message_time_key, restimestamp, (new Date().getTime())).then(function (messagetimelist) {
  130. res.sessionId = session;
  131. res.message = messagetimelist.length;
  132. callback(res, j, _len);
  133. }).catch(function (err) {
  134. throw err;
  135. })
  136. }
  137. /**
  138. * 列表封装完毕后由此回调返回数据界面
  139. * @param res
  140. * @param j
  141. * @param _len
  142. */
  143. function callback(res, j, _len) {
  144. sessionlist.push(res);
  145. if (j == (_len - 1)) {
  146. modelUtil.emitData(self.eventEmitter, {"status": 200, "data": sessionlist});
  147. }
  148. }
  149. }).catch(function (res) {
  150. modelUtil.emitData(self.eventEmitter, "get list error " + res + ",user:" + userId);
  151. })
  152. }
  153. /**
  154. * 根据会话中的消息
  155. *
  156. * @param sessionId 会话ID
  157. * @param user 拉取消息的人
  158. * @param page 第几页
  159. * @param pagesize 分页数量
  160. */
  161. getMessages(sessionId, user, page, pagesize) {
  162. let self = this;
  163. let message_timestamp_key = super.makeRedisKey(RedisKeys.MessagesByTimestamp, sessionId);
  164. let message_key = super.makeRedisKey(RedisKeys.Messages, sessionId);
  165. let participants_key = super.makeRedisKey(RedisKeys.Participants, sessionId);
  166. //超过最大限制后从mysql获取数据
  167. if (page * pagesize >= config.sessionConfig.maxMessageCount) {
  168. let message = new Messages();
  169. message.getMessageByPage(sessionId, page, pagesize, function (err, res) {
  170. if (!err) {
  171. modelUtil.emitData(self.eventEmitter, {"status": 200, "data": res});
  172. } else {
  173. modelUtil.emitData(self.eventEmitter, {"status": -1, "data": err});
  174. }
  175. })
  176. } else {
  177. if (page > 0) {
  178. page = page * pagesize;
  179. pagesize = pagesize + page;
  180. }
  181. let participants = new Participants();
  182. participants.existsParticipant(sessionId, user, function (res) {
  183. if (!res) {
  184. modelUtil.emitData(self.eventEmitter, {"status": -1, "msg": "用户不在此会话中!"});
  185. } else {
  186. //倒序取出最后N条消息
  187. redis.zrevrangeAsync(message_timestamp_key, page, pagesize).then(function (res) {
  188. //取出消息实体
  189. if (res.length == 0) {
  190. modelUtil.emitData(self.eventEmitter, {"status": 200, "data": []});
  191. return;
  192. }
  193. redis.hmgetAsync(message_key, res).then(function (messages) {
  194. console.log(messages)
  195. //将取到的消息返回给前端
  196. modelUtil.emitData(self.eventEmitter, {"status": 200, "data": messages});
  197. }).then(function () {
  198. //更新患者最后一次获取消息的日期
  199. redis.zaddAsync(participants_key, (new Date().getTime()), user).then(function (res) {
  200. console.log(res);
  201. }).catch(function (res) {
  202. throw res;
  203. })
  204. })
  205. }).catch(function (res) {
  206. modelUtil.emitData(self.eventEmitter, {"status": -1, "msg": res});
  207. })
  208. }
  209. })
  210. }
  211. }
  212. /**
  213. * 更新会话最后一条消息
  214. *
  215. * @param session_key rediskey
  216. * @param session_type
  217. * @param name 议题名称
  218. * @param message
  219. * @returns {*}
  220. */
  221. updateLastContent(session_key, session_type, name, message) {
  222. return redis.hmsetAsync(session_key,
  223. "create_date", message.timestamp,
  224. "last_content", message.content,
  225. "last_content_type", message.contentType,
  226. "type", session_type,
  227. "senderId", message.senderId,
  228. "senderName", message.senderName,
  229. "name", name
  230. );
  231. }
  232. /**
  233. * 保存消息
  234. *
  235. * @param message
  236. * @param sessionId
  237. */
  238. saveMessageBySession(message, sessionId) {
  239. let self = this;
  240. let messages = new Messages();
  241. let participants = new Participants();
  242. let session_key = super.makeRedisKey(RedisKeys.Session, sessionId);
  243. let message_id = mongoose.Types.ObjectId().toString();
  244. let session_type = 0;
  245. let name = "";
  246. participants.existsParticipant(sessionId, messages.senderId, function (res) {
  247. //校验发送成员是都在讨论组
  248. if (res) {
  249. redis.hmgetAsync(session_key, ["type", "name"]).then(function (res) {
  250. session_type = res[0];
  251. name = res[1];
  252. if (!session_type || !name) {
  253. log.error("session is error for key " + session_key);
  254. throw "session is not found";
  255. }
  256. }).then(function (res) {
  257. //更新消息相关
  258. return messages.saveMessageForRedis(message_id, sessionId, message);
  259. }).then(function (res) {
  260. //更新session的最后一条聊天记录
  261. return self.updateLastContent(session_key, session_type, name, message);
  262. }).then(function (res) {
  263. //操作mysql数据库
  264. messages.saveMessageToMysql(message, session_type, message_id, sessionId);
  265. //返回数据给前端。
  266. modelUtil.emitData(self.eventEmitter, {"status": 200, "msg": "发送成功!"});
  267. //消息推送
  268. }).catch(function (res) {
  269. modelUtil.emitData(self.eventEmitter, {"status": -1, "msg": res});
  270. })
  271. } else {
  272. modelUtil.emitData(self.eventEmitter, {"status": -1, "msg": "用户不在此会话当中!"});
  273. }
  274. })
  275. }
  276. /**
  277. * 置顶操作
  278. */
  279. stickSession(sessionId, user) {
  280. let user_session_key = super.makeRedisKey(RedisKeys.UserSessions, user);
  281. let self = this;
  282. //取出最大的session
  283. redis.zrevrangeAsync(user_session_key, 0, 0).then(function (res) {
  284. //获取该session的时间搓
  285. redis.zscoreAsync(user_session_key, res).then(function (scoreres) {
  286. let nowtime = new Date().getTime();
  287. //当前时间搓比redis的时间搓更早证明没有置顶过
  288. if (scoreres <= nowtime) {
  289. //初始化置顶
  290. redis.zaddAsync(user_session_key, Commons.STICKY_SESSION_BASE_SCORE, sessionId).then(function (res) {
  291. log.info("stickSession:" + sessionId + ",res:" + res);
  292. modelUtil.emitData(self.eventEmitter, {"status": 200, "msg": "置顶成功!"});
  293. }).then(function () {
  294. SessionRepo.saveStickySession(sessionId, user, Commons.STICKY_SESSION_BASE_SCORE);
  295. })
  296. } else {
  297. //已有置顶的数据,取出来加1保存回去
  298. scoreres = Number(scoreres) + 1;
  299. redis.zaddAsync(user_session_key, scoreres, sessionId).then(function () {
  300. log.info("stickSession:" + sessionId + ",res:" + res);
  301. modelUtil.emitData(self.eventEmitter, {"status": 200, "msg": "置顶成功!"});
  302. }).then(function () {
  303. SessionRepo.saveStickySession(sessionId, user, scoreres);
  304. })
  305. }
  306. })
  307. })
  308. }
  309. /**
  310. * 取消会话置顶
  311. */
  312. cancelStickSession(sessionId, user) {
  313. let user_session_key = super.makeRedisKey(RedisKeys.UserSessions, user);
  314. let participants_key = super.makeRedisKey(RedisKeys.Participants, sessionId);
  315. let self = this;
  316. redis.zscoreAsync(participants_key, user).then(function (res) {
  317. if (!res) {
  318. res = new Date().getTime();
  319. }
  320. redis.zaddAsync(user_session_key, res, sessionId).then(function (res) {
  321. log.info("cancelStickSession:" + sessionId);
  322. modelUtil.emitData(self.eventEmitter, {"status": 200, "msg": "取消置顶成功!"});
  323. }).then(function () {
  324. SessionRepo.unstickSession(sessionId, user);
  325. });
  326. })
  327. }
  328. }
  329. // Expose class
  330. module.exports = Sessions;