sessions.js 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429
  1. /**
  2. * 会话模型。
  3. */
  4. "use strict";
  5. let RedisClient = require('../../repository/redis/redis.client.js');
  6. let RedisModel = require('./../redis.model.js');
  7. let ModelUtil = require('../../util/model.util');
  8. let Messages = require('../messages/messages');
  9. let Participants = require('./Participants');
  10. let SessionRepo = require('../../repository/mysql/session.repo');
  11. let ParticipantRepo = require('../../repository/mysql/participant.repo');
  12. let configFile = require('../../include/commons').CONFIG_FILE;
  13. let config = require('../../resources/config/' + configFile);
  14. let redis = RedisClient.redisClient().connection;
  15. let log = require('../../util/log.js');
  16. let mongoose = require('mongoose');
  17. const RedisKeys = require('../../include/commons').REDIS_KEYS;
  18. const Commons = require('../../include/commons');
  19. class Sessions extends RedisModel {
  20. constructor() {
  21. super();
  22. }
  23. /**
  24. * 创建会话
  25. *
  26. * type = 1 sessionid = md5(patientId); MUC
  27. * type = 2 sessionId = hash(user1,user2); P2P
  28. * type = 3 sessionId = groupid; 团队群聊
  29. * @param sessionId 会话ID
  30. * @param name 会话名称
  31. * @param type 会话类型
  32. * @param users 会话成员
  33. */
  34. createSession(sessionId, name, type, users,handler) {
  35. let self = this;
  36. let _super = super.makeRedisKey;
  37. users = eval("["+users+"]")[0];
  38. if (type == config.sessionConfig.P2P) {//P2P消息用hash校验
  39. var userArray=[];
  40. for(var key in users){
  41. userArray.push(key);
  42. }
  43. if(userArray.length>2){
  44. ModelUtil.emitData(self.eventEmitter, {"status": -1, "msg": "会话人数超过2个无法创建P2P会话!"});
  45. return false;
  46. }
  47. ParticipantRepo.findSessionIdByParticipantIds(userArray[0],userArray[0],function(err,res){
  48. sessionId = res;
  49. callcreate(sessionId);
  50. });
  51. }else{
  52. callcreate();
  53. }
  54. function callcreate(){
  55. let createDate = new Date();
  56. let session_key = _super(RedisKeys.Session, sessionId);
  57. let participants = new Participants();
  58. // 将session加入redis
  59. participants.saveParticipantsToRedis(sessionId, users, createDate, function (res) {
  60. if (!res) {
  61. ModelUtil.emitData(self.eventEmitter, {"status": -1, "msg": res});
  62. } else {
  63. let messages = {};
  64. messages.senderId = "system";
  65. messages.senderName = "系统消息";
  66. messages.timestamp = createDate;
  67. messages.content = "";
  68. messages.contentType = "1";
  69. self.updateLastContent(session_key, type, name, messages);
  70. if(config.sessionConfig.MUC==type){
  71. handler(true);
  72. }else {
  73. modelUtil.emitData(self.eventEmitter, {"status": 200, "msg": "session create success!"});
  74. }
  75. self.saveSessionToMysql(sessionId, name, type, createDate);
  76. participants.saveParticipantsToMysql(sessionId, users); //创建session成员到数据库
  77. }
  78. })
  79. }
  80. }
  81. /**
  82. * 保存session到MySQL
  83. * @param sessionId
  84. * @param name
  85. * @param type
  86. * @param createDate
  87. */
  88. saveSessionToMysql(sessionId, name, type, createDate) {
  89. SessionRepo.saveSession(sessionId, name, type, createDate);
  90. }
  91. /**
  92. * 获取某个用户的全部session列表
  93. * @param userId
  94. * @param handler
  95. */
  96. getUserSessionsFromMysql(userId, handler) {
  97. SessionRepo.findAll(userId, handler);
  98. }
  99. /**
  100. * 获取session单个对象
  101. * @param sessionId
  102. * @param handler
  103. */
  104. getSessions(sessionId, handler) {
  105. SessionRepo.findOne(sessionId, handler);
  106. }
  107. /**
  108. * 根据用户ID获取用户的session列表
  109. * @param userId
  110. * @param page
  111. * @param pagesize
  112. */
  113. getUserSessions(userId, page, pagesize) {
  114. let user_session_key = super.makeRedisKey(RedisKeys.UserSessions, userId);
  115. let self = this;
  116. let _super = super.makeRedisKey;
  117. if (page > 0) {
  118. page = page * pagesize;
  119. pagesize = pagesize + page;
  120. }
  121. //倒序
  122. redis.zrevrangeAsync(user_session_key, page, pagesize).then(function (res) {
  123. let sessionlist = [];
  124. if (res.length == 0) {
  125. ModelUtil.emitData(self.eventEmitter, {"status": 200, "data": res});
  126. } else {
  127. for (var j in res) {
  128. calllist(res[j], j, res.length);
  129. }
  130. }
  131. function calllist(session, j, _len) {
  132. let session_key = _super(RedisKeys.Session, session);
  133. redis.hgetallAsync(session_key).then(function (res) {
  134. let participants_key = _super(RedisKeys.Participants, session);
  135. //当前用户最后一次登录改讨论组时间
  136. redis.zscoreAsync(participants_key, userId).then(function (restimestamp) {
  137. //时间差获取消息数量
  138. callamount(res, j, _len, session, restimestamp);
  139. })
  140. }).catch(function (err) {
  141. throw err;
  142. })
  143. }
  144. /**
  145. * 消息统计
  146. * @param res 返回的会话列表
  147. * @param j 当前会话列表的位置
  148. * @param _len 列表长度 用做返回前端操作
  149. * @param session 当前会话
  150. * @param restimestamp 当前会话当前用户的最后一次时间搓
  151. */
  152. function callamount(res, j, _len, session, restimestamp) {
  153. let message_time_key = _super(RedisKeys.MessagesByTimestamp, session);
  154. redis.zrangebyscoreAsync(message_time_key, restimestamp, (new Date().getTime())).then(function (messagetimelist) {
  155. res.sessionId = session;
  156. res.message = messagetimelist.length;
  157. callrole(res, j, _len,session);
  158. }).catch(function (err) {
  159. throw err;
  160. })
  161. }
  162. /**
  163. * 用户角色
  164. * @param res要返回的JSON
  165. * @param j 第N调数据
  166. * @param _len 总数据长度
  167. */
  168. function callrole(res, j, _len,session){
  169. let participants_role_key = _super(RedisKeys.ParticipantsRole, session);
  170. redis.hgetAsync(participants_role_key, userId).then(function(role){
  171. res.role=role;
  172. callback(res, j, _len);
  173. })
  174. }
  175. /**
  176. * 列表封装完毕后由此回调返回数据界面
  177. * @param res
  178. * @param j
  179. * @param _len
  180. */
  181. function callback(res, j, _len) {
  182. sessionlist.push(res);
  183. if (j == (_len - 1)) {
  184. ModelUtil.emitData(self.eventEmitter, {"status": 200, "data": sessionlist});
  185. }
  186. }
  187. }).catch(function (res) {
  188. ModelUtil.emitData(self.eventEmitter, "get list error " + res + ",user:" + userId);
  189. })
  190. }
  191. /**
  192. * 根据会话中的消息
  193. *
  194. * @param sessionId 会话ID
  195. * @param user 拉取消息的人
  196. * @param page 第几页
  197. * @param pagesize 分页数量
  198. */
  199. getMessages(sessionId, user, page, pagesize) {
  200. let self = this;
  201. let message_timestamp_key = super.makeRedisKey(RedisKeys.MessagesByTimestamp, sessionId);
  202. let message_key = super.makeRedisKey(RedisKeys.Messages, sessionId);
  203. let participants_key = super.makeRedisKey(RedisKeys.Participants, sessionId);
  204. //超过最大限制后从mysql获取数据
  205. if (page * pagesize >= config.sessionConfig.maxMessageCount) {
  206. self.getMessageFromMySQL(sessionId, page, pagesize, function (err, res) {
  207. if (!err) {
  208. ModelUtil.emitData(self.eventEmitter, {"status": 200, "data": res});
  209. } else {
  210. ModelUtil.emitData(self.eventEmitter, {"status": -1, "data": err});
  211. }
  212. })
  213. } else {
  214. if (page > 0) {
  215. page = page * pagesize;
  216. pagesize = pagesize + page;
  217. }
  218. let participants = new Participants();
  219. participants.existsParticipant(sessionId, user, function (res) {
  220. if (!res) {
  221. ModelUtil.emitData(self.eventEmitter, {"status": -1, "msg": "用户不在此会话中!"});
  222. } else {
  223. //倒序取出最后N条消息
  224. redis.zrevrangeAsync(message_timestamp_key, page, pagesize).then(function (res) {
  225. //取出消息实体
  226. if (res.length == 0) {
  227. ModelUtil.emitData(self.eventEmitter, {"status": 200, "data": []});
  228. return;
  229. }
  230. redis.hmgetAsync(message_key, res).then(function (messages) {
  231. console.log(messages)
  232. //将取到的消息返回给前端
  233. ModelUtil.emitData(self.eventEmitter, {"status": 200, "data": messages});
  234. }).then(function () {
  235. //更新患者最后一次获取消息的日期
  236. redis.zaddAsync(participants_key, (new Date().getTime()), user).then(function (res) {
  237. console.log(res);
  238. }).catch(function (res) {
  239. throw res;
  240. })
  241. })
  242. }).catch(function (res) {
  243. ModelUtil.emitData(self.eventEmitter, {"status": -1, "msg": res});
  244. })
  245. }
  246. })
  247. }
  248. }
  249. /**
  250. * 更新会话最后一条消息
  251. *
  252. * @param session_key rediskey
  253. * @param session_type
  254. * @param name 议题名称
  255. * @param message
  256. * @returns {*}
  257. */
  258. updateLastContent(session_key, session_type, name, message) {
  259. return redis.hmsetAsync(session_key,
  260. "create_date", message.timestamp,
  261. "last_content", message.content,
  262. "last_content_type", message.contentType,
  263. "type", session_type,
  264. "senderId", message.senderId,
  265. "senderName", message.senderName,
  266. "name", name
  267. );
  268. }
  269. /**
  270. * 保存消息
  271. *
  272. * @param message
  273. * @param sessionId
  274. */
  275. saveMessageBySession(message, sessionId) {
  276. let self = this;
  277. let messages = new Messages();
  278. let participants = new Participants();
  279. let session_key = super.makeRedisKey(RedisKeys.Session, sessionId);
  280. let message_id = mongoose.Types.ObjectId().toString();
  281. let session_type = 0;
  282. let name = "";
  283. participants.existsParticipant(sessionId, message.senderId, function (res) {
  284. //校验发送成员是都在讨论组
  285. if (res) {
  286. redis.hmgetAsync(session_key, ["type", "name"]).then(function (res) {
  287. session_type = res[0];
  288. name = res[1];
  289. if (!session_type || !name) {
  290. log.error("session is error for key " + session_key);
  291. throw "session is not found";
  292. }
  293. }).then(function (res) {
  294. //更新消息相关
  295. return messages.saveMessageToRedis(message_id, sessionId, message);
  296. }).then(function (res) {
  297. //更新session的最后一条聊天记录
  298. return self.updateLastContent(session_key, session_type, name, message);
  299. }).then(function (res) {
  300. //操作mysql数据库
  301. messages.saveMessageToMysql(message, session_type, message_id, sessionId);
  302. //返回数据给前端。
  303. ModelUtil.emitData(self.eventEmitter, {"status": 200, "msg": "发送成功!"});
  304. //消息推送
  305. }).catch(function (res) {
  306. ModelUtil.emitData(self.eventEmitter, {"status": -1, "msg": res});
  307. })
  308. } else {
  309. ModelUtil.emitData(self.eventEmitter, {"status": -1, "msg": "用户不在此会话当中!"});
  310. }
  311. })
  312. }
  313. /**
  314. * 保存消息
  315. *
  316. * @param message
  317. * @param sessionId
  318. */
  319. saveMessageByTopic(message, sessionId,handler) {
  320. let self = this;
  321. let messages = new Messages();
  322. let participants = new Participants();
  323. let session_key = super.makeRedisKey(RedisKeys.Session, sessionId);
  324. let message_id = mongoose.Types.ObjectId().toString();
  325. let session_type = 0;
  326. let name = "";
  327. participants.existsParticipant(sessionId, message.senderId, function (err,res) {
  328. //校验发送成员是都在讨论组
  329. if (res) {
  330. redis.hmgetAsync(session_key, ["type", "name"]).then(function (res) {
  331. session_type = res[0];
  332. name = res[1];
  333. if (!session_type || !name) {
  334. log.error("session is error for key " + session_key);
  335. throw "session is not found";
  336. }
  337. }).then(function (res) {
  338. //更新消息相关
  339. return messages.saveMessageForRedis(message_id, sessionId, message);
  340. }).then(function (res) {
  341. //更新session的最后一条聊天记录
  342. return self.updateLastContent(session_key, session_type, name, message);
  343. }).then(function (res) {
  344. //操作mysql数据库
  345. messages.saveMessageToMysql(message, session_type, message_id, sessionId);
  346. //返回数据给前端。
  347. handler(null,message_id)
  348. //消息推送
  349. }).catch(function (res) {
  350. handler(res,message_id)
  351. })
  352. } else {
  353. handler( "用户不在此会话当中!",message_id);
  354. }
  355. })
  356. }
  357. /**
  358. * 置顶操作
  359. */
  360. stickSession(sessionId, user) {
  361. let user_session_key = super.makeRedisKey(RedisKeys.UserSessions, user);
  362. let self = this;
  363. //取出最大的session
  364. redis.zrevrangeAsync(user_session_key, 0, 0).then(function (res) {
  365. //获取该session的时间搓
  366. redis.zscoreAsync(user_session_key, res).then(function (scoreres) {
  367. let nowtime = new Date().getTime();
  368. //当前时间搓比redis的时间搓更早证明没有置顶过
  369. if (scoreres <= nowtime) {
  370. //初始化置顶
  371. redis.zaddAsync(user_session_key, Commons.STICKY_SESSION_BASE_SCORE, sessionId).then(function (res) {
  372. log.info("stickSession:" + sessionId + ",res:" + res);
  373. ModelUtil.emitData(self.eventEmitter, {"status": 200, "msg": "置顶成功!"});
  374. }).then(function () {
  375. SessionRepo.saveStickySession(sessionId, user, Commons.STICKY_SESSION_BASE_SCORE);
  376. })
  377. } else {
  378. //已有置顶的数据,取出来加1保存回去
  379. scoreres = Number(scoreres) + 1;
  380. redis.zaddAsync(user_session_key, scoreres, sessionId).then(function () {
  381. log.info("stickSession:" + sessionId + ",res:" + res);
  382. ModelUtil.emitData(self.eventEmitter, {"status": 200, "msg": "置顶成功!"});
  383. }).then(function () {
  384. SessionRepo.saveStickySession(sessionId, user, scoreres);
  385. })
  386. }
  387. })
  388. })
  389. }
  390. /**
  391. * 取消会话置顶
  392. */
  393. cancelStickSession(sessionId, user) {
  394. let user_session_key = super.makeRedisKey(RedisKeys.UserSessions, user);
  395. let participants_key = super.makeRedisKey(RedisKeys.Participants, sessionId);
  396. let self = this;
  397. redis.zscoreAsync(participants_key, user).then(function (res) {
  398. if (!res) {
  399. res = new Date().getTime();
  400. }
  401. redis.zaddAsync(user_session_key, res, sessionId).then(function (res) {
  402. log.info("cancelStickSession:" + sessionId);
  403. ModelUtil.emitData(self.eventEmitter, {"status": 200, "msg": "取消置顶成功!"});
  404. }).then(function () {
  405. SessionRepo.unstickSession(sessionId, user);
  406. });
  407. })
  408. }
  409. }
  410. // Expose class
  411. module.exports = Sessions;