sessions.js 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382
  1. /**
  2. * 会话模型。
  3. */
  4. "use strict";
  5. let RedisClient = require('../../repository/redis/redis.client.js');
  6. let redisClient = RedisClient.redisClient();
  7. let redis = redisClient.connection;
  8. let RedisModel = require('./../redis.model.js');
  9. let modelUtil = require('../../util/model.util');
  10. let Messages = require('../messages/messages');
  11. let Participants = require('./Participants');
  12. let log = require('../../util/log.js');
  13. const RedisKeys = require('../../include/commons').REDIS_KEYS;
  14. const Commons = require('../../include/commons');
  15. let configFile = require('../../include/commons').CONFIG_FILE;
  16. let config = require('../../resources/config/' + configFile);
  17. let SessionRepo = require('../../repository/mysql/session.repo');
  18. let ParticipantRepo = require('../../repository/mysql/participant.repo');
  19. let mongoose = require('mongoose');
  20. class Sessions extends RedisModel {
  21. constructor() {
  22. super();
  23. }
  24. /**
  25. * 创建会话
  26. *
  27. * type = 1 sessionid = md5(patientId); MUC
  28. * type = 2 sessionId = hash(user1,user2); P2P
  29. * type = 3 sessionId = groupid; 团队群聊
  30. * @param sessionId 会话ID
  31. * @param name 会话名称
  32. * @param type 会话类型
  33. * @param users 会话成员
  34. */
  35. createSession(sessionId, name, type, users) {
  36. let self = this;
  37. let _super = super.makeRedisKey;
  38. users = eval("["+users+"]")[0];
  39. if (type == 2) {//P2P消息用hash校验
  40. var userArray=[];
  41. for(var key in users){
  42. userArray.push(key);
  43. }
  44. if(userArray.length>2){
  45. modelUtil.emitData(self.eventEmitter, {"status": -1, "msg": "会话人数超过2个无法创建P2P会话!"});
  46. return false;
  47. }
  48. ParticipantRepo.findSessionIdByParticipantIds(userArray[0],userArray[0],function(err,res){
  49. sessionId = res;
  50. callcreate(sessionId);
  51. })
  52. }else{
  53. callcreate();
  54. }
  55. function callcreate(){
  56. let createDate = new Date();
  57. let session_key = _super(RedisKeys.Session, sessionId);
  58. let participants = new Participants();
  59. // 将session加入redis
  60. participants.saveParticipantsToRedis(sessionId, users, createDate, function (res) {
  61. if (!res) {
  62. modelUtil.emitData(self.eventEmitter, {"status": -1, "msg": res});
  63. } else {
  64. let messages = {};
  65. messages.senderId = "system";
  66. messages.senderName = "系统消息";
  67. messages.timestamp = createDate;
  68. messages.content = "";
  69. messages.contentType = "1";
  70. self.updateLastContent(session_key, type, name, messages);
  71. modelUtil.emitData(self.eventEmitter, {"status": 200, "msg": "session create success!"});
  72. self.saveSessionToMysql(sessionId, name, type, createDate);
  73. participants.saveParticipantsToMysql(sessionId, users); //创建session成员到数据库
  74. }
  75. })
  76. }
  77. }
  78. /**
  79. * 保存session到MySQL
  80. * @param sessionId
  81. * @param name
  82. * @param type
  83. * @param createDate
  84. */
  85. saveSessionToMysql(sessionId, name, type, createDate) {
  86. SessionRepo.saveSession(sessionId, name, type, createDate);
  87. }
  88. /**
  89. * 获取某个用户的全部session列表
  90. * @param userId
  91. * @param handler
  92. */
  93. getUserSessionsFromMysql(userId, handler) {
  94. SessionRepo.findAll(userId, handler);
  95. }
  96. /**
  97. * 获取session单个对象
  98. * @param sessionId
  99. * @param handler
  100. */
  101. getSessions(sessionId, handler) {
  102. SessionRepo.findOne(sessionId, handler);
  103. }
  104. /**
  105. * 根据用户ID获取用户的session列表
  106. * @param userId
  107. * @param page
  108. * @param pagesize
  109. */
  110. getUserSessions(userId, page, pagesize) {
  111. let user_session_key = super.makeRedisKey(RedisKeys.UserSessions, userId);
  112. let self = this;
  113. let _super = super.makeRedisKey;
  114. if (page > 0) {
  115. page = page * pagesize;
  116. pagesize = pagesize + page;
  117. }
  118. //倒序
  119. redis.zrevrangeAsync(user_session_key, page, pagesize).then(function (res) {
  120. let sessionlist = [];
  121. if (res.length == 0) {
  122. modelUtil.emitData(self.eventEmitter, {"status": 200, "data": res});
  123. } else {
  124. for (var j in res) {
  125. calllist(res[j], j, res.length);
  126. }
  127. }
  128. function calllist(session, j, _len) {
  129. let session_key = _super(RedisKeys.Session, session);
  130. redis.hgetallAsync(session_key).then(function (res) {
  131. let participants_key = _super(RedisKeys.Participants, session);
  132. //当前用户最后一次登录改讨论组时间
  133. redis.zscoreAsync(participants_key, userId).then(function (restimestamp) {
  134. //时间差获取消息数量
  135. callamount(res, j, _len, session, restimestamp);
  136. })
  137. }).catch(function (err) {
  138. throw err;
  139. })
  140. }
  141. /**
  142. * 消息统计
  143. * @param res 返回的会话列表
  144. * @param j 当前会话列表的位置
  145. * @param _len 列表长度 用做返回前端操作
  146. * @param session 当前会话
  147. * @param restimestamp 当前会话当前用户的最后一次时间搓
  148. */
  149. function callamount(res, j, _len, session, restimestamp) {
  150. let message_time_key = _super(RedisKeys.MessagesByTimestamp, session);
  151. redis.zrangebyscoreAsync(message_time_key, restimestamp, (new Date().getTime())).then(function (messagetimelist) {
  152. res.sessionId = session;
  153. res.message = messagetimelist.length;
  154. callrole(res, j, _len,session);
  155. }).catch(function (err) {
  156. throw err;
  157. })
  158. }
  159. /**
  160. * 用户角色
  161. * @param res要返回的JSON
  162. * @param j 第N调数据
  163. * @param _len 总数据长度
  164. */
  165. function callrole(res, j, _len,session){
  166. let participants_role_key = _super(RedisKeys.ParticipantsRole, session);
  167. redis.hgetAsync(participants_role_key, userId).then(function(role){
  168. res.role=role;
  169. callback(res, j, _len);
  170. })
  171. }
  172. /**
  173. * 列表封装完毕后由此回调返回数据界面
  174. * @param res
  175. * @param j
  176. * @param _len
  177. */
  178. function callback(res, j, _len) {
  179. sessionlist.push(res);
  180. if (j == (_len - 1)) {
  181. modelUtil.emitData(self.eventEmitter, {"status": 200, "data": sessionlist});
  182. }
  183. }
  184. }).catch(function (res) {
  185. modelUtil.emitData(self.eventEmitter, "get list error " + res + ",user:" + userId);
  186. })
  187. }
  188. /**
  189. * 根据会话中的消息
  190. *
  191. * @param sessionId 会话ID
  192. * @param user 拉取消息的人
  193. * @param page 第几页
  194. * @param pagesize 分页数量
  195. */
  196. getMessages(sessionId, user, page, pagesize) {
  197. let self = this;
  198. let message_timestamp_key = super.makeRedisKey(RedisKeys.MessagesByTimestamp, sessionId);
  199. let message_key = super.makeRedisKey(RedisKeys.Messages, sessionId);
  200. let participants_key = super.makeRedisKey(RedisKeys.Participants, sessionId);
  201. //超过最大限制后从mysql获取数据
  202. if (page * pagesize >= config.sessionConfig.maxMessageCount) {
  203. let message = new Messages();
  204. message.getMessageByPage(sessionId, page, pagesize, function (err, res) {
  205. if (!err) {
  206. modelUtil.emitData(self.eventEmitter, {"status": 200, "data": res});
  207. } else {
  208. modelUtil.emitData(self.eventEmitter, {"status": -1, "data": err});
  209. }
  210. })
  211. } else {
  212. if (page > 0) {
  213. page = page * pagesize;
  214. pagesize = pagesize + page;
  215. }
  216. let participants = new Participants();
  217. participants.existsParticipant(sessionId, user, function (res) {
  218. if (!res) {
  219. modelUtil.emitData(self.eventEmitter, {"status": -1, "msg": "用户不在此会话中!"});
  220. } else {
  221. //倒序取出最后N条消息
  222. redis.zrevrangeAsync(message_timestamp_key, page, pagesize).then(function (res) {
  223. //取出消息实体
  224. if (res.length == 0) {
  225. modelUtil.emitData(self.eventEmitter, {"status": 200, "data": []});
  226. return;
  227. }
  228. redis.hmgetAsync(message_key, res).then(function (messages) {
  229. console.log(messages)
  230. //将取到的消息返回给前端
  231. modelUtil.emitData(self.eventEmitter, {"status": 200, "data": messages});
  232. }).then(function () {
  233. //更新患者最后一次获取消息的日期
  234. redis.zaddAsync(participants_key, (new Date().getTime()), user).then(function (res) {
  235. console.log(res);
  236. }).catch(function (res) {
  237. throw res;
  238. })
  239. })
  240. }).catch(function (res) {
  241. modelUtil.emitData(self.eventEmitter, {"status": -1, "msg": res});
  242. })
  243. }
  244. })
  245. }
  246. }
  247. /**
  248. * 更新会话最后一条消息
  249. *
  250. * @param session_key rediskey
  251. * @param session_type
  252. * @param name 议题名称
  253. * @param message
  254. * @returns {*}
  255. */
  256. updateLastContent(session_key, session_type, name, message) {
  257. return redis.hmsetAsync(session_key,
  258. "create_date", message.timestamp,
  259. "last_content", message.content,
  260. "last_content_type", message.contentType,
  261. "type", session_type,
  262. "senderId", message.senderId,
  263. "senderName", message.senderName,
  264. "name", name
  265. );
  266. }
  267. /**
  268. * 保存消息
  269. *
  270. * @param message
  271. * @param sessionId
  272. */
  273. saveMessageBySession(message, sessionId) {
  274. let self = this;
  275. let messages = new Messages();
  276. let participants = new Participants();
  277. let session_key = super.makeRedisKey(RedisKeys.Session, sessionId);
  278. let message_id = mongoose.Types.ObjectId().toString();
  279. let session_type = 0;
  280. let name = "";
  281. participants.existsParticipant(sessionId, message.senderId, function (res) {
  282. //校验发送成员是都在讨论组
  283. if (res) {
  284. redis.hmgetAsync(session_key, ["type", "name"]).then(function (res) {
  285. session_type = res[0];
  286. name = res[1];
  287. if (!session_type || !name) {
  288. log.error("session is error for key " + session_key);
  289. throw "session is not found";
  290. }
  291. }).then(function (res) {
  292. //更新消息相关
  293. return messages.saveMessageForRedis(message_id, sessionId, message);
  294. }).then(function (res) {
  295. //更新session的最后一条聊天记录
  296. return self.updateLastContent(session_key, session_type, name, message);
  297. }).then(function (res) {
  298. //操作mysql数据库
  299. messages.saveMessageToMysql(message, session_type, message_id, sessionId);
  300. //返回数据给前端。
  301. modelUtil.emitData(self.eventEmitter, {"status": 200, "msg": "发送成功!"});
  302. //消息推送
  303. }).catch(function (res) {
  304. modelUtil.emitData(self.eventEmitter, {"status": -1, "msg": res});
  305. })
  306. } else {
  307. modelUtil.emitData(self.eventEmitter, {"status": -1, "msg": "用户不在此会话当中!"});
  308. }
  309. })
  310. }
  311. /**
  312. * 置顶操作
  313. */
  314. stickSession(sessionId, user) {
  315. let user_session_key = super.makeRedisKey(RedisKeys.UserSessions, user);
  316. let self = this;
  317. //取出最大的session
  318. redis.zrevrangeAsync(user_session_key, 0, 0).then(function (res) {
  319. //获取该session的时间搓
  320. redis.zscoreAsync(user_session_key, res).then(function (scoreres) {
  321. let nowtime = new Date().getTime();
  322. //当前时间搓比redis的时间搓更早证明没有置顶过
  323. if (scoreres <= nowtime) {
  324. //初始化置顶
  325. redis.zaddAsync(user_session_key, Commons.STICKY_SESSION_BASE_SCORE, sessionId).then(function (res) {
  326. log.info("stickSession:" + sessionId + ",res:" + res);
  327. modelUtil.emitData(self.eventEmitter, {"status": 200, "msg": "置顶成功!"});
  328. }).then(function () {
  329. SessionRepo.saveStickySession(sessionId, user, Commons.STICKY_SESSION_BASE_SCORE);
  330. })
  331. } else {
  332. //已有置顶的数据,取出来加1保存回去
  333. scoreres = Number(scoreres) + 1;
  334. redis.zaddAsync(user_session_key, scoreres, sessionId).then(function () {
  335. log.info("stickSession:" + sessionId + ",res:" + res);
  336. modelUtil.emitData(self.eventEmitter, {"status": 200, "msg": "置顶成功!"});
  337. }).then(function () {
  338. SessionRepo.saveStickySession(sessionId, user, scoreres);
  339. })
  340. }
  341. })
  342. })
  343. }
  344. /**
  345. * 取消会话置顶
  346. */
  347. cancelStickSession(sessionId, user) {
  348. let user_session_key = super.makeRedisKey(RedisKeys.UserSessions, user);
  349. let participants_key = super.makeRedisKey(RedisKeys.Participants, sessionId);
  350. let self = this;
  351. redis.zscoreAsync(participants_key, user).then(function (res) {
  352. if (!res) {
  353. res = new Date().getTime();
  354. }
  355. redis.zaddAsync(user_session_key, res, sessionId).then(function (res) {
  356. log.info("cancelStickSession:" + sessionId);
  357. modelUtil.emitData(self.eventEmitter, {"status": 200, "msg": "取消置顶成功!"});
  358. }).then(function () {
  359. SessionRepo.unstickSession(sessionId, user);
  360. });
  361. })
  362. }
  363. }
  364. // Expose class
  365. module.exports = Sessions;