sessions.js 44 KB


  1. /**
  2. * 会话模型。
  3. */
  4. "use strict";
  5. let RedisClient = require('../../repository/redis/redis.client.js');
  6. let RedisModel = require('./../redis.model.js');
  7. let ModelUtil = require('../../util/model.util');
  8. let Messages = require('../messages/messages');
  9. let Users = require('../user/users');
  10. let Participants = require('./participants');
  11. let SessionRepo = require('../../repository/mysql/session.repo');
  12. let TopicRepo = require('../../repository/mysql/topics.repo');
  13. let ParticipantRepo = require('../../repository/mysql/participant.repo');
  14. let WechatClient = require("../client/wechat.client.js");
  15. let AppClient = require("../client/app.client.js");
  16. let configFile = require('../../include/commons').CONFIG_FILE;
  17. let config = require('../../resources/config/' + configFile);
  18. let redis = RedisClient.redisClient().connection;
  19. let logger = require('../../util/log.js');
  20. let mongoose = require('mongoose');
  21. let async = require("async");
  22. let log = require("../../util/log.js");
  23. const REDIS_KEYS = require('../../include/commons').REDIS_KEYS;
  24. const SESSION_TYPES = require('../../include/commons').SESSION_TYPES;
  25. const STICKY_SESSION_BASE_SCORE = require('../../include/commons').STICKY_SESSION_BASE_SCORE;
  26. const PARTICIPANT_ROLES = require('../../include/commons').PARTICIPANT_ROLES;
  27. class Sessions extends RedisModel {
  28. constructor() {
  29. super();
  30. }
  31. /**
  32. * 创建会话。会话的ID来源:
  33. * MUC:患者的ID
  34. * P2P:对成员的ID排序后,取hash值
  35. * GROUP:团队的ID
  36. *
  37. * @param sessionId
  38. * @param name 会话名称
  39. * @param type 会话类型
  40. * @param participantArray 会话成员
  41. * @param handler 回调,仅MUC模式使用
  42. */
  43. createSession(sessionId, name, type, participantArray, handler) {
  44. let self = this;
  45. let messageId = mongoose.Types.ObjectId().toString();
  46. //创建session到mysql
  47. self.createSessionToMysql(sessionId, name, type, participantArray, messageId, function (err, res) {
  48. if (err) {
  49. if (handler) {
  50. handler(err, null);
  51. return;
  52. }
  53. ModelUtil.emitError(self.eventEmitter, {message: err, status: -1}, null);
  54. } else {
  55. //创建session到redis
  56. self.createSessionToRedis(sessionId, name, type, participantArray, messageId, function (err, res) {
  57. if (err) {
  58. if (handler) {
  59. handler(err, null);
  60. return;
  61. }
  62. ModelUtil.emitError(self.eventEmitter, {message: err, status: -1}, null);
  63. } else {
  64. //更新成为进行中的会话
  65. self.updateSessionStatus(sessionId,0);
  66. if (handler) {
  67. handler(null, res);
  68. return;
  69. }
  70. ModelUtil.emitOK(self.eventEmitter, {status: 200, data: res});
  71. }
  72. });
  73. }
  74. });
  75. }
  76. /**
  77. * 创建会话。REDIS
  78. * @param sessionId
  79. * @param name
  80. * @param type
  81. * @param participantArray
  82. * @param messageId
  83. * @param handler
  84. * @returns {boolean}
  85. */
  86. createSessionToRedis(sessionId, name, type, participantArray, messageId, handler) {
  87. let self = this;
  88. let messages = new Messages();
  89. let participantIdArray = [];
  90. for (let i in participantArray) {
  91. participantIdArray.push(participantArray[i].split(":")[0]);
  92. }
  93. if (type == SESSION_TYPES.P2P || type == SESSION_TYPES.SYSTEM) {
  94. if (sessionId) {
  95. callBusinessType(sessionId);
  96. return;
  97. }
  98. if (participantIdArray.length != 2) {
  99. handler("P2P session only allow 2 participants.", null);
  100. return false;
  101. }
  102. ParticipantRepo.findSessionIdByParticipantIds(participantIdArray[0], participantIdArray[1], function (err, res) {
  103. sessionId = res;
  104. callBusinessType(sessionId);
  105. });
  106. } else {
  107. if (!sessionId) {
  108. handler("MUC OR GROUP session sessionId is not allow null .", null);
  109. return;
  110. }
  111. callBusinessType(sessionId);
  112. }
  113. function callBusinessType(sessionId) {
  114. ParticipantRepo.getBusinessType(participantIdArray, function (err, businessType) {
  115. callCreate(sessionId, businessType);
  116. });
  117. }
  118. function callCreate(sessionId, businessType) {
  119. let createDate = new Date();
  120. let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
  121. let message = {
  122. sender_id: "system",
  123. sender_name: "system",
  124. content_type: 11,
  125. content: "会话创建成功",
  126. timestamp: createDate,
  127. id: messageId
  128. };
  129. if (type == SESSION_TYPES.MUC) {
  130. businessType = 2;
  131. }
  132. let session = {
  133. id: sessionId,
  134. name: name,
  135. type: type,
  136. create_date: createDate.getTime(),
  137. business_type: businessType,
  138. last_sender_id: message.sender_id,
  139. last_sender_name: message.sender_name,
  140. last_message_time: message.timestamp.getTime(),
  141. last_content: message.content,
  142. last_content_type: message.content_type
  143. };
  144. redis.hmsetAsync(sessionKey, session).then(function () {
  145. Participants.saveParticipantsToRedis(sessionId, participantArray, createDate, function (res) {
  146. handler(null, session);
  147. //messages.saveMessageToRedisFromCreateSession(sessionId, messageId, message);
  148. });
  149. })
  150. }
  151. }
  152. /**
  153. * 创建会话。mysql
  154. * @param sessionId
  155. * @param name
  156. * @param type
  157. * @param participantArray
  158. * @param messageId
  159. * @param handler
  160. */
  161. createSessionToMysql(sessionId, name, type, participantArray, messageId, handler) {
  162. let self = this;
  163. //如果sessionId不存在则执行创建sessionId过程
  164. let participantIdArray = [];
  165. for (let i in participantArray) {
  166. participantIdArray.push(participantArray[i].split(":")[0]);
  167. }
  168. //流程1-判断是否存在sessionId不存在则创建对应的sessionId;
  169. if (!sessionId) {
  170. if (type == SESSION_TYPES.P2P || type == SESSION_TYPES.SYSTEM) {
  171. ParticipantRepo.findSessionIdByParticipantIds(participantIdArray[0], participantIdArray[1], function (err, res) {
  172. sessionId = res;
  173. callBusinessType();
  174. });
  175. } else {
  176. return handler("MUC模式和团队模式,不允许sessionId为空!", null);
  177. }
  178. } else {
  179. callBusinessType();
  180. }
  181. //流程2-判断session的业务类型;
  182. function callBusinessType() {
  183. ParticipantRepo.getBusinessType(participantIdArray, function (err, businessType) {
  184. if (err) {
  185. handler(err, null);
  186. return;
  187. }
  188. callCreateSession(businessType);
  189. });
  190. }
  191. //流程3-发起session创建 返回session实例
  192. function callCreateSession(businessType) {
  193. //查找该sessionId是否存在存在则直接返回实例
  194. SessionRepo.findOne(sessionId, function (err, res) {
  195. if (res.length > 0) {//已经存在
  196. //更新成员
  197. Participants.saveParticipantsToMysql(sessionId, participantArray, function (err, update) {
  198. handler(null, res[0]);
  199. return;
  200. })
  201. } else {
  202. let createDate = new Date();
  203. let session = {
  204. id: sessionId,
  205. name: name,
  206. type: type,
  207. create_date: createDate.getTime(),
  208. business_type: businessType
  209. };
  210. //将session写入数据库
  211. self.saveSessionToMysql(sessionId, name, type, createDate, businessType, function (err, res) {
  212. if (err) {
  213. handler(err, null);
  214. return;
  215. }
  216. callCreateParticipants(session);
  217. })
  218. }
  219. });
  220. }
  221. //流程4-发起session成员创建
  222. function callCreateParticipants(session) {
  223. Participants.saveParticipantsToMysql(sessionId, participantArray, function (err, res) {
  224. if (err) {
  225. handler(err, null);
  226. return;
  227. } else {
  228. handler(null, session);
  229. return;
  230. }
  231. })
  232. }
  233. }
  234. /**
  235. * 最近会话列表,7天内。
  236. *
  237. * @param userId
  238. * @param dateSpan
  239. */
  240. getRecentSessions(userId, dateSpan) {
  241. let self = this;
  242. SessionRepo.findAllByTimestampAndType(userId, dateSpan, function (err, res) {
  243. if (err) {
  244. ModelUtil.emitError(self.eventEmitter, "Get recent sessions failed", err);
  245. return;
  246. }
  247. let sessions = [];
  248. res.forEach(function (session) {
  249. sessions.push({
  250. id: session.id,
  251. name: session.name,
  252. type: session.type,
  253. business_type: session.business_type,
  254. create_date: session.create_date
  255. })
  256. });
  257. ModelUtil.emitOK(self.eventEmitter, sessions);
  258. });
  259. }
  260. /**
  261. * 保存session到MySQL
  262. * @param sessionId
  263. * @param name
  264. * @param type
  265. * @param createDate
  266. * @param businessType
  267. * @param handler
  268. */
  269. saveSessionToMysql(sessionId, name, type, createDate, businessType, handler) {
  270. SessionRepo.saveSession(sessionId, name, type, createDate, businessType, handler);
  271. }
  272. /**
  273. * 获取某个用户的全部session列表
  274. * @param userId
  275. * @param handler
  276. */
  277. getUserSessionsFromMysql(userId, handler) {
  278. SessionRepo.findAll(userId, handler);
  279. }
  280. /**
  281. * 获取session单个对象
  282. * @param sessionId
  283. * @param handler
  284. */
  285. getSessions(sessionId, handler) {
  286. SessionRepo.findOne(sessionId, handler);
  287. }
  288. /**
  289. * 根据用户ID获取用户的session列表
  290. * @param userId
  291. * @param page
  292. * @param size
  293. * @param businessType
  294. */
  295. getUserSessions(userId, page, size, businessType) {
  296. let userSessionKey = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, userId);
  297. let self = this;
  298. if (page > 0) {
  299. if (page == 1) {
  300. page = 0;
  301. }
  302. page = page + page * size;
  303. }
  304. async.waterfall([
  305. // 获取会话ID列表
  306. function (callback) {
  307. redis.zrevrangeAsync(userSessionKey, page, size)
  308. .then(function (sessionIds) {
  309. if (sessionIds.length == 0) {
  310. ModelUtil.emitOK(self.eventEmitter, []);
  311. return;
  312. }
  313. callback(null, sessionIds);
  314. })
  315. },
  316. // 遍历会话
  317. function (sessionIds) {
  318. let sessionList = [];
  319. let functionList = [];
  320. for (let j = 0; j < sessionIds.length; j++) {
  321. let fun = function (index, callback) {
  322. if (!callback) {
  323. callback = index, index = 0
  324. }
  325. let sessionId = sessionIds[index];
  326. let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
  327. let participantsRoleKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipantsRole, sessionId);
  328. let sessionParticipantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  329. let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  330. redis.multi()
  331. .hgetall(sessionKey) // 会话实体
  332. .hget(participantsRoleKey, userId) // 用户在此会话中的角色
  333. .zscore(sessionParticipantsKey, userId) // 用户在此会话中最后一次获取未读消息的时间
  334. .zrange(participantsKey, 0, -1)
  335. .zrange(sessionParticipantsKey, 0,-1,'withscores') // 所有用户在此会话中最后一次获取未读消息的时间
  336. .execAsync()
  337. .then(function (res) {
  338. let session = res[0];
  339. let role = res[1];
  340. let lastFetchTime = res[2];
  341. let users = res[3];
  342. let participantsTimeArray = res[4];
  343. let participantsTime = [];
  344. for(var j = 0 ;j<participantsTimeArray.length;j++){
  345. if(j%2!=0)continue;
  346. let participantsTimeJson = {};
  347. participantsTimeJson[participantsTimeArray[j]] = participantsTimeArray[j+1];
  348. participantsTime.push(participantsTimeJson);
  349. }
  350. let sessionName = "";
  351. let otherUserId = "";
  352. if (session.type == SESSION_TYPES.P2P) {
  353. for (let j in users) {
  354. if (users[j] != userId) {
  355. otherUserId = users[j];
  356. }
  357. }
  358. }
  359. if (!role) role = 0;
  360. if (!lastFetchTime) lastFetchTime = new Date().getTime();
  361. // 计算未读消息数
  362. let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  363. redis.zcountAsync(messagesByTimestampKey, lastFetchTime, new Date().getTime())
  364. .then(function (count) {
  365. if (!otherUserId) otherUserId = userId;
  366. ParticipantRepo.findNameById(otherUserId, function (err, res) {
  367. if ((res && res.length == 0) || session.type != SESSION_TYPES.P2P) {
  368. sessionName = session.name;
  369. } else {
  370. sessionName = res[0].name;
  371. }
  372. var bir = new Date().getTime();
  373. if (res.length != 0 && res[0].birthdate) {
  374. bir = res[0].birthdate.getTime();
  375. }
  376. var sex = 1;
  377. if (res.length != 0 && res[0].sex) {
  378. sex = res[0].sex;
  379. }
  380. sessionList.push({
  381. id: sessionId,
  382. name: sessionName,
  383. create_date: session.create_date,
  384. last_content_type: session.last_content_type,
  385. last_content: session.last_content,
  386. sender_id: session.sender_id,
  387. type: session.type,
  388. sender_name: session.sender_name,
  389. unread_count: count,
  390. business_type: session.business_type,
  391. my_role: role,
  392. sender_sex: sex,
  393. sender_birthday: bir,
  394. participantsTimeArray:participantsTime
  395. });
  396. index = (parseInt(index) + 1);
  397. if (index == sessionIds.length) {
  398. ModelUtil.emitOK(self.eventEmitter, sessionList);
  399. } else {
  400. callback(null, index);
  401. }
  402. })
  403. })
  404. })
  405. .catch(function (err) {
  406. logger.error("Get sessions failed: ", err);
  407. });
  408. };
  409. functionList.push(fun);
  410. }
  411. async.waterfall(functionList);
  412. }
  413. ]);
  414. }
  415. /**
  416. * 获取会话消息。全部,不管已读/未读状态。
  417. *
  418. * @param sessionId 会话ID
  419. * @param userId 拉取消息的人
  420. * @param page 第几页
  421. * @param pagesize 分页数量
  422. * @param start_msg_id 消息会话最新的一条消息的ID
  423. * @param end_msg_id 消息会话刚开始的消息ID
  424. */
  425. getMessages(sessionId, user, start_msg_id, end_msg_id, page, pagesize, isoffset, handler) {
  426. let self = this;
  427. let message_timestamp_key = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  428. if (!start_msg_id && !end_msg_id) {
  429. redis.zrevrangeAsync(message_timestamp_key, 0, 0).then(function (res) {
  430. if (res.length == 0) {
  431. if (handler) {
  432. handler(null, res);
  433. return;
  434. }
  435. ModelUtil.emitOK(self.eventEmitter, res);
  436. return;
  437. }
  438. start_msg_id = res[0];
  439. redis.zrangeAsync(message_timestamp_key, 0, 0).then(function (res) {
  440. if (res.length == 0) {
  441. if (handler) {
  442. handler(null, res);
  443. return;
  444. }
  445. ModelUtil.emitOK(self.eventEmitter, res);
  446. return;
  447. }
  448. end_msg_id = res[0];
  449. self.getMessagesByPage(sessionId, user, end_msg_id, start_msg_id, page, pagesize, isoffset, function (err, res) {
  450. if (err) {
  451. if (handler) {
  452. handler(err, null);
  453. return;
  454. }
  455. logger.error("getMessagesByPage error" + err);
  456. ModelUtil.emitError(self.eventEmitter, err, err);
  457. } else {
  458. if (handler) {
  459. handler(null, res);
  460. return;
  461. }
  462. ModelUtil.emitOK(self.eventEmitter, res);
  463. }
  464. })
  465. })
  466. })
  467. } else if (!start_msg_id) {
  468. redis.zrevrangeAsync(message_timestamp_key, 0, 0).then(function (res) {
  469. if (res.length == 0) {
  470. if (handler) {
  471. handler(null, res);
  472. return;
  473. }
  474. ModelUtil.emitOK(self.eventEmitter, res);
  475. return;
  476. }
  477. start_msg_id = res[0];
  478. self.getMessagesByPage(sessionId, user, start_msg_id,end_msg_id , page, pagesize, isoffset, function (err, res) {
  479. if (err) {
  480. if (handler) {
  481. handler(err, null);
  482. return;
  483. }
  484. logger.error("getMessagesByPage error" + err);
  485. ModelUtil.emitError(self.eventEmitter, err, err);
  486. } else {
  487. if (handler) {
  488. handler(null, res);
  489. return;
  490. }
  491. ModelUtil.emitOK(self.eventEmitter, res);
  492. }
  493. })
  494. })
  495. } else if (!end_msg_id) {
  496. redis.zrangeAsync(message_timestamp_key, 0, 0).then(function (res) {
  497. if (res.length == 0) {
  498. ModelUtil.emitOK(self.eventEmitter, res);
  499. return;
  500. }
  501. end_msg_id = res[0];
  502. self.getMessagesByPage(sessionId, user, end_msg_id,start_msg_id, page, pagesize, isoffset, function (err, res) {
  503. if (err) {
  504. if (handler) {
  505. handler(err, null);
  506. return;
  507. }
  508. logger.error("getMessagesByPage error" + err);
  509. ModelUtil.emitError(self.eventEmitter, err, err);
  510. } else {
  511. if (handler) {
  512. handler(null, res);
  513. return;
  514. }
  515. ModelUtil.emitOK(self.eventEmitter, res);
  516. }
  517. })
  518. })
  519. } else {
  520. self.getMessagesByPage(sessionId, user, end_msg_id, start_msg_id, page, pagesize, isoffset, function (err, res) {
  521. if (err) {
  522. if (handler) {
  523. handler(err, null);
  524. return;
  525. }
  526. logger.error("getMessagesByPage error" + err);
  527. ModelUtil.emitError(self.eventEmitter, err, err);
  528. } else {
  529. if (handler) {
  530. handler(null, res);
  531. return;
  532. }
  533. ModelUtil.emitOK(self.eventEmitter, res);
  534. }
  535. })
  536. }
  537. }
  538. /**
  539. * 分页获取会话消息。
  540. *
  541. * @param sessionId 必选。会话ID
  542. * @param userId 必选。用户ID
  543. * @param startMsgId 必选。会话的的起始消息ID,作为检索的起始依据
  544. * @param endMsgId 必选。会话中的结束消息ID
  545. * @param page 必选。页码
  546. * @param size 必选。页面大小
  547. * @param handler 必选。回调 如果是根据结束消息获取是否有新消息返回时正序的数据
  548. */
  549. getMessagesByPage(sessionId, userId, startMsgId, endMsgId, page, size, isoffset, handler) {
  550. let messagesTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  551. let messagesKey = RedisModel.makeRedisKey(REDIS_KEYS.Messages, sessionId);
  552. let participants = new Participants();
  553. let offset = (page - 1 < 0 ? 0 : page - 1) * size;
  554. let count = size;
  555. if (page > 1 || isoffset == 1) {
  556. offset += 1; // 翻页由于闭区间,需跳过本身数据
  557. }
  558. participants.existsParticipant(sessionId, userId, function (err, res) {
  559. if (!res) {
  560. handler(Error("User not found in session " + sessionId), null);
  561. } else {
  562. //将消息ID转换成分值
  563. redis.multi()
  564. .zscore(messagesTimestampKey, startMsgId)
  565. .zscore(messagesTimestampKey, endMsgId)
  566. .execAsync()
  567. .then(function (res) {
  568. let startMsgScore = res[1];
  569. let endMsgScore = res[0];
  570. if (startMsgScore == null || endMsgScore == null || (startMsgScore == endMsgScore && isoffset == 1)) {
  571. handler(null, []);
  572. return;
  573. }
  574. if(endMsgScore>startMsgScore){
  575. redis.zrangebyscoreAsync(messagesTimestampKey, startMsgScore, endMsgScore, "limit", offset, count)
  576. .then(function (res) {
  577. if (res.length == 0) {
  578. handler(null, []);
  579. return;
  580. }
  581. redis.hmgetAsync(messagesKey, res).then(function (messages) {
  582. handler(null, messages);
  583. }).then(function () {
  584. Sessions.updateParticipantLastFetchTime(sessionId, userId, new Date().getTime());
  585. })
  586. })
  587. .catch(function (err) {
  588. logger.error("Get message by page failed: ", err);
  589. handler(err, false);
  590. })
  591. }else{
  592. // 从消息时间表中过滤出要获取的消息ID列表,倒序取出消息
  593. redis.zrevrangebyscoreAsync(messagesTimestampKey, startMsgScore, endMsgScore, "limit", offset, count)
  594. .then(function (res) {
  595. if (res.length == 0) {
  596. handler(null, []);
  597. return;
  598. }
  599. redis.hmgetAsync(messagesKey, res).then(function (messages) {
  600. handler(null, messages);
  601. }).then(function () {
  602. Sessions.updateParticipantLastFetchTime(sessionId, userId, new Date().getTime());
  603. })
  604. })
  605. .catch(function (err) {
  606. logger.error("Get message by page failed: ", err);
  607. handler(err, false);
  608. })
  609. }
  610. })
  611. }
  612. })
  613. }
  614. /**
  615. * 获取所有会话的未读消息数。
  616. */
  617. getAllSessionsUnreadMessageCount(userId) {
  618. let self = this;
  619. let count = 0;
  620. let patientCount = 0;
  621. let doctorCount = 0;
  622. SessionRepo.findAll(userId, function (err, res) {
  623. if (err) {
  624. ModelUtil.logError("getAllSessionsUnreadMessageCount is failed", err);
  625. return;
  626. }
  627. if (res.length == 0) {
  628. ModelUtil.emitOK(self.eventEmitter, {count: count});
  629. return;
  630. }
  631. for (let j in res) {
  632. if (res[j].type == SESSION_TYPES.SYSTEM) {
  633. if (j == res.length - 1) {
  634. ModelUtil.emitOK(self.eventEmitter, {count: count});
  635. }
  636. continue;
  637. }
  638. callback(res, j, res[j]);
  639. }
  640. });
  641. function callback(res, j, session) {
  642. self.getSessionUnreadMessageCount(res[j].id, userId, function (err, con) {
  643. if (err) {
  644. ModelUtil.logError("getAllSessionsUnreadMessageCount is failed", err);
  645. }
  646. count = count + con;
  647. if (session.type == 2) {
  648. patientCount = patientCount + con;
  649. } else {
  650. doctorCount = doctorCount + con;
  651. }
  652. if (j == res.length - 1) {
  653. ModelUtil.emitOK(self.eventEmitter, {count: count, patient: patientCount, doctor: doctorCount});
  654. }
  655. })
  656. }
  657. }
  658. /**
  659. * 获取会话的未读消息数。根据成员最后一次获取消息的时候与当前时间。
  660. *
  661. * @param sessionId
  662. * @param userId
  663. */
  664. getSessionUnreadMessageCount(sessionId, userId, handler) {
  665. let self = this;
  666. let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  667. let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  668. async.waterfall([
  669. // 此成员最后获取消息的时间
  670. function (callback) {
  671. redis.zscoreAsync(participantsKey, userId)
  672. .then(function (lastFetchTime) {
  673. callback(null, lastFetchTime);
  674. })
  675. },
  676. // 计算最后获取消息的时间之后到现在有多少条消息
  677. function (lastFetchTime, callback) {
  678. if (!lastFetchTime) lastFetchTime = 0;
  679. let now = new Date().getTime();
  680. redis.zcountAsync(messagesByTimestampKey, lastFetchTime, now)
  681. .then(function (count) {
  682. if (handler) {
  683. handler(null, count);
  684. } else {
  685. ModelUtil.emitOK(self.eventEmitter, {count: count});
  686. }
  687. })
  688. }
  689. ], function (err, res) {
  690. if (err) {
  691. if (handler) {
  692. handler(err, 0);
  693. } else {
  694. ModelUtil.emitError(self.eventEmitter, "Get session unread message count failed.")
  695. }
  696. }
  697. });
  698. }
  699. /**
  700. * 获取会话未读消息数。根据成员最后一次获取消息的时候与当前时间。
  701. */
  702. getSessionUnreadMessages(sessionId, userId) {
  703. let self = this;
  704. let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  705. let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  706. async.waterfall([
  707. // 此成员最后获取消息的时间
  708. function (callback) {
  709. redis.zscoreAsync(participantsKey, userId)
  710. .then(function (lastFetchTime) {
  711. callback(null, lastFetchTime);
  712. })
  713. },
  714. // 最后获取消息的时间之后到现在的消息ID列表
  715. function (lastFetchTime, callback) {
  716. if (!lastFetchTime) lastFetchTime = 0;
  717. let now = new Date().getTime();
  718. redis.zrangebyscoreAsync(messagesByTimestampKey, lastFetchTime, now)
  719. .then(function (messageIds) {
  720. callback(null, messageIds);
  721. })
  722. },
  723. // 获取消息
  724. function (messageIds, callback) {
  725. if (messageIds.length == 0) {
  726. ModelUtil.emitOK(self.eventEmitter, []);
  727. return;
  728. }
  729. let startMsgId = messageIds[0];
  730. let endMsgId = messageIds[messageIds.length - 1];
  731. self.getMessagesByPage(sessionId, userId, startMsgId, endMsgId, 0, messageIds.length, 0, function (err, res) {
  732. if (err) {
  733. ModelUtil.emitError(self.eventEmitter, err.message);
  734. return;
  735. }
  736. ModelUtil.emitOK(self.eventEmitter, res);
  737. });
  738. }
  739. ], function (err, res) {
  740. if (err) {
  741. ModelUtil.emitError(self.eventEmitter, "Get session unread message count failed.")
  742. }
  743. });
  744. }
  745. /**
  746. * 保存消息。
  747. *
  748. * 也可以根据议题保存消息,但最终还是保存到与会话对象。
  749. *
  750. * see also: saveMessageByTopic
  751. *
  752. * @param message
  753. * @param sessionId
  754. */
  755. saveMessageBySession(sessionId, message) {
  756. let self = this;
  757. let messages = new Messages();
  758. let participants = new Participants();
  759. let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
  760. let messageId = mongoose.Types.ObjectId().toString();
  761. message.id = messageId;
  762. // 检查会话中是否存在此成员
  763. participants.existsParticipant(sessionId, message.sender_id, function (err, res) {
  764. if (err) {
  765. ModelUtil.emitError(self.eventEmitter, "Check session participant failed: ", err);
  766. return;
  767. }
  768. if (res) {
  769. redis.hmgetAsync(sessionKey, ["type", "name"]).then(function (res) {
  770. let sessionType = res[0];
  771. let sessionName = res[1];
  772. if (sessionType == null) {
  773. ModelUtil.emitError(self.eventEmitter, "Session " + sessionId + " is not found.");
  774. return;
  775. }
  776. // 消息保存到Redis,并更新会话最后状态、用户最后消息获取时间
  777. messages.saveMessageToRedis(sessionId, sessionType, messageId, message);
  778. Messages.updateLastContent(sessionKey, sessionType, sessionName, message);
  779. Sessions.updateParticipantLastFetchTime(sessionId, message.sender_id, message.timestamp.getTime());
  780. // 更新MYSQL中会话的最新状态,并保存消息
  781. SessionRepo.updateSessionLastStatus(message.sender_id, message.sender_name, message.timestamp, message.content, message.content_type, sessionId);
  782. messages.saveMessageToMysql(sessionId, sessionType, messageId, message, function (err, res) {
  783. if (err) {
  784. ModelUtil.emitError(self.eventEmitter, {message: "Failed to save message to mysql: " + err});
  785. } else {
  786. message.timestamp = message.timestamp.getTime();
  787. ModelUtil.emitOK(self.eventEmitter, {count: 1, messages: [message]});
  788. }
  789. });
  790. }).then(function (res) {
  791. // 推送消息
  792. ParticipantRepo.findIds(sessionId, function (err, res) {
  793. if (err) {
  794. ModelUtil.logError("Push message from session: get participant's id list failed: ", err);
  795. } else {
  796. message.session_id = sessionId;
  797. res.forEach(function (participant) {
  798. if (participant.id !== message.sender_id &&
  799. participant.participant_role == PARTICIPANT_ROLES.HOST) {
  800. Sessions.pushNotification(participant.id, participant.name, message);
  801. }
  802. });
  803. }
  804. })
  805. }).catch(function (err) {
  806. ModelUtil.emitError(self.eventEmitter, {message: "Error occurred while save message to session: " + err});
  807. })
  808. } else {
  809. ModelUtil.emitDataNotFound(self.eventEmitter, {message: "当前会话找不到此发送者"});
  810. }
  811. });
  812. }
  813. sendTopicMessages(topicId, message) {
  814. let self = this;
  815. TopicRepo.findAllByTopicId(topicId, function (err, res) {
  816. if (err || res.length == 0) {
  817. ModelUtil.emitOK(self.eventEmitter, {status: -1, "message": "议题获取失败"});
  818. return;
  819. }
  820. self.saveMessageByTopic(message, res[0].session_id, function (err, messageId) {
  821. if (err) {
  822. ModelUtil.emitOK(self.eventEmitter, {status: -1, "message": err});
  823. } else {
  824. message.id = messageId;
  825. ModelUtil.emitOK(self.eventEmitter, {status: 200, "message": "发送成功", data: message});
  826. }
  827. });
  828. });
  829. }
  830. /**
  831. * 保存消息
  832. *
  833. * @param message
  834. * @param sessionId
  835. * @param handler
  836. */
  837. saveMessageByTopic(message, sessionId, handler) {
  838. let messages = new Messages();
  839. let participants = new Participants();
  840. let session_key = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
  841. let messageId = mongoose.Types.ObjectId().toString();
  842. let self = this;
  843. let sessionType = 0;
  844. let sessionName = "";
  845. message.id = messageId;
  846. // 发送成员必须处于会话中
  847. participants.existsParticipant(sessionId, message.sender_id, function (err, res) {
  848. if (res) {
  849. redis.hmgetAsync(session_key, ["type", "name"]).then(function (res) {
  850. sessionType = res[0];
  851. sessionName = res[1];
  852. if (!sessionType || !sessionName) {
  853. logger.error("Unknown session key " + session_key);
  854. if (handler) return handler(new Error("Unknown session key " + session_key));
  855. }
  856. }).then(function (res) {
  857. // 消息数据双写,并更新用户最后消息获取时间,会话新状态等
  858. messages.saveMessageToRedis(sessionId, sessionType, messageId, message);
  859. messages.saveMessageToMysql(sessionId, sessionType, messageId, message);
  860. // 更新会话最新状态及成员最后一次消息获取时间
  861. Sessions.updateParticipantLastFetchTime(sessionId, message.sender_id, message.timestamp.getTime());
  862. Messages.updateLastContent(session_key, sessionType, sessionName, message);
  863. SessionRepo.updateSessionLastStatus(message.sender_id, message.sender_name, message.timestamp, message.content, message.content_type, sessionId);
  864. if (handler) handler(null, messageId);
  865. }).then(function (res) {
  866. // 推送消息
  867. ParticipantRepo.findIds(sessionId, function (err, res) {
  868. if (err) {
  869. if (handler) handler(err, messageId)
  870. } else {
  871. message.session_id = sessionId;
  872. res.forEach(function (participant) {
  873. if (participant.id !== message.sender_id &&
  874. participant.participant_role == PARTICIPANT_ROLES.HOST) {
  875. Sessions.pushNotification(participant.id, participant.name, message);
  876. }
  877. });
  878. }
  879. })
  880. }).catch(function (err) {
  881. if (handler) handler(err, messageId)
  882. })
  883. } else {
  884. if (handler) handler("用户不在此会话当中!", messageId);
  885. }
  886. })
  887. }
  888. /**
  889. * 置顶操作
  890. */
  891. stickSession(sessionId, user) {
  892. let user_session_key = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, user);
  893. let self = this;
  894. //取出最大的session
  895. redis.zrevrangeAsync(user_session_key, 0, 0).then(function (res) {
  896. //获取该session的时间搓
  897. redis.zscoreAsync(user_session_key, res).then(function (scoreres) {
  898. let nowtime = new Date().getTime();
  899. //当前时间搓比redis的时间搓更早证明没有置顶过
  900. if (scoreres <= nowtime) {
  901. //初始化置顶
  902. redis.zaddAsync(user_session_key, STICKY_SESSION_BASE_SCORE, sessionId).then(function (res) {
  903. logger.info("stickSession:" + sessionId + ",res:" + res);
  904. ModelUtil.emitOK(self.eventEmitter, {});
  905. }).then(function () {
  906. SessionRepo.saveStickySession(sessionId, user, STICKY_SESSION_BASE_SCORE);
  907. })
  908. } else {
  909. //已有置顶的数据,取出来加1保存回去
  910. scoreres = Number(scoreres) + 1;
  911. redis.zaddAsync(user_session_key, scoreres, sessionId).then(function () {
  912. logger.info("stickSession:" + sessionId + ",res:" + res);
  913. ModelUtil.emitOK(self.eventEmitter, {});
  914. }).then(function () {
  915. SessionRepo.saveStickySession(sessionId, user, scoreres);
  916. })
  917. }
  918. })
  919. })
  920. }
  921. /**
  922. * 取消会话置顶
  923. */
  924. cancelStickSession(sessionId, user) {
  925. let user_session_key = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, user);
  926. let participants_key = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  927. let self = this;
  928. redis.zscoreAsync(participants_key, user).then(function (res) {
  929. if (!res) {
  930. res = new Date().getTime();
  931. }
  932. redis.zaddAsync(user_session_key, res, sessionId).then(function (res) {
  933. logger.info("cancelStickSession:" + sessionId);
  934. ModelUtil.emitOK(self.eventEmitter, {});
  935. }).then(function () {
  936. SessionRepo.unstickSession(sessionId, user);
  937. });
  938. })
  939. }
  940. /**
  941. * 更新会话参与者的最后消息获取时间。
  942. *
  943. * @param sessionId
  944. * @param userId
  945. */
  946. static updateParticipantLastFetchTime(sessionId, userId, score) {
  947. score = score + 1;
  948. let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  949. redis.zaddAsync(participantsKey, score, userId)
  950. .then(function (res) {
  951. ParticipantRepo.updateLastFetchTime(new Date(score), sessionId, userId, function (err, res) {
  952. if (err) {
  953. logger.error("Update participant last fetch time failed: ", err);
  954. }
  955. });
  956. })
  957. .catch(function (err) {
  958. logger.error("Update participant last fetch time failed: ", err);
  959. });
  960. }
  961. /**
  962. * 向用户推送通知,微信端用户直接推送消息,APP端通过个推发送通知消息。
  963. *
  964. * @param targetUserId
  965. * @param message
  966. */
  967. static pushNotification(targetUserId, targetUserName, message) {
  968. Users.isPatientId(targetUserId, function (err, isPatient) {
  969. if (isPatient) {
  970. WechatClient.sendMessage(targetUserId, targetUserName, message);
  971. }
  972. else {
  973. AppClient.sendNotification(targetUserId, message);
  974. }
  975. });
  976. }
  977. /**
  978. * 针对MUC模式更新会话的当前状态
  979. * @param sessionId
  980. */
  981. updateSessionStatus(sessionId,status){
  982. let self = this;
  983. let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session,sessionId);
  984. redis.hsetAsync(sessionKey,"status",status).then(function(res){
  985. if(!res){
  986. logger.error("set session status to redis is error !");
  987. }
  988. SessionRepo.updateSessionStatus(sessionId,status,function(err,sqlResult){
  989. if(err){
  990. logger.error("set session status to mysql is error !");
  991. }else{
  992. logger.info("set session status is success");
  993. }
  994. });
  995. });
  996. }
  997. }
  998. // Expose class
  999. module.exports = Sessions;