sessions.js 45 KB


  1. /**
  2. * 会话模型。
  3. */
  4. "use strict";
  5. let RedisClient = require('../../repository/redis/redis.client.js');
  6. let RedisModel = require('./../redis.model.js');
  7. let ModelUtil = require('../../util/model.util');
  8. let Messages = require('../messages/messages');
  9. let Users = require('../user/users');
  10. let Participants = require('./participants');
  11. let SessionRepo = require('../../repository/mysql/session.repo');
  12. let TopicRepo = require('../../repository/mysql/topics.repo');
  13. let ParticipantRepo = require('../../repository/mysql/participant.repo');
  14. let WechatClient = require("../client/wechat.client.js");
  15. let AppClient = require("../client/app.client.js");
  16. let configFile = require('../../include/commons').CONFIG_FILE;
  17. let config = require('../../resources/config/' + configFile);
  18. let redis = RedisClient.redisClient().connection;
  19. let logger = require('../../util/log.js');
  20. let mongoose = require('mongoose');
  21. let async = require("async");
  22. let log = require("../../util/log.js");
  23. const REDIS_KEYS = require('../../include/commons').REDIS_KEYS;
  24. const SESSION_TYPES = require('../../include/commons').SESSION_TYPES;
  25. const STICKY_SESSION_BASE_SCORE = require('../../include/commons').STICKY_SESSION_BASE_SCORE;
  26. const PARTICIPANT_ROLES = require('../../include/commons').PARTICIPANT_ROLES;
  27. class Sessions extends RedisModel {
  28. constructor() {
  29. super();
  30. }
  31. /**
  32. * 创建会话。会话的ID来源:
  33. * MUC:患者的ID
  34. * P2P:对成员的ID排序后,取hash值
  35. * GROUP:团队的ID
  36. *
  37. * @param sessionId
  38. * @param name 会话名称
  39. * @param type 会话类型
  40. * @param participantArray 会话成员
  41. * @param handler 回调,仅MUC模式使用
  42. */
  43. createSession(sessionId, name, type, participantArray, handler) {
  44. let self = this;
  45. let messageId = mongoose.Types.ObjectId().toString();
  46. //创建session到mysql
  47. self.createSessionToMysql(sessionId, name, type, participantArray, messageId, function (err, res) {
  48. if (err) {
  49. if (handler) {
  50. handler(err, null);
  51. return;
  52. }
  53. ModelUtil.emitError(self.eventEmitter, {message: err, status: -1}, null);
  54. } else {
  55. //创建session到redis
  56. self.createSessionToRedis(sessionId, name, type, participantArray, messageId, function (err, res) {
  57. if (err) {
  58. if (handler) {
  59. handler(err, null);
  60. return;
  61. }
  62. ModelUtil.emitError(self.eventEmitter, {message: err, status: -1}, null);
  63. } else {
  64. //更新成为进行中的会话
  65. self.updateSessionStatus(sessionId,0);
  66. if (handler) {
  67. handler(null, res);
  68. return;
  69. }
  70. ModelUtil.emitOK(self.eventEmitter, {status: 200, data: res});
  71. }
  72. });
  73. }
  74. });
  75. }
  76. /**
  77. * 创建会话。REDIS
  78. * @param sessionId
  79. * @param name
  80. * @param type
  81. * @param participantArray
  82. * @param messageId
  83. * @param handler
  84. * @returns {boolean}
  85. */
  86. createSessionToRedis(sessionId, name, type, participantArray, messageId, handler) {
  87. let self = this;
  88. let messages = new Messages();
  89. let participantIdArray = [];
  90. for (let i in participantArray) {
  91. participantIdArray.push(participantArray[i].split(":")[0]);
  92. }
  93. if (type == SESSION_TYPES.P2P || type == SESSION_TYPES.SYSTEM) {
  94. if (sessionId) {
  95. callBusinessType(sessionId);
  96. return;
  97. }
  98. else if (participantIdArray.length != 2) {
  99. handler("P2P session only allow 2 participants.", null);
  100. return false;
  101. }else{
  102. ParticipantRepo.findSessionIdByParticipantIds(participantIdArray[0], participantIdArray[1], function (err, res) {
  103. sessionId = res;
  104. callBusinessType(sessionId);
  105. return;
  106. });
  107. }
  108. } else {
  109. if (!sessionId) {
  110. handler("MUC OR GROUP session sessionId is not allow null .", null);
  111. return;
  112. }
  113. callBusinessType(sessionId);
  114. }
  115. function callBusinessType(sessionId) {
  116. ParticipantRepo.getBusinessType(participantIdArray, function (err, businessType) {
  117. callCreate(sessionId, businessType);
  118. });
  119. }
  120. function callCreate(sessionId, businessType) {
  121. let createDate = new Date();
  122. let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
  123. let message = {
  124. sender_id: "system",
  125. sender_name: "system",
  126. content_type: 11,
  127. content: "会话创建成功",
  128. timestamp: createDate,
  129. id: messageId
  130. };
  131. if (type == SESSION_TYPES.MUC) {
  132. businessType = 2;
  133. }
  134. let session = {
  135. id: sessionId,
  136. name: name,
  137. type: type,
  138. create_date: createDate.getTime(),
  139. business_type: businessType,
  140. last_sender_id: message.sender_id,
  141. last_sender_name: message.sender_name,
  142. last_message_time: message.timestamp.getTime(),
  143. last_content: message.content,
  144. last_content_type: message.content_type
  145. };
  146. redis.hmsetAsync(sessionKey, session).then(function () {
  147. Participants.saveParticipantsToRedis(sessionId, participantArray, createDate, function (res) {
  148. handler(null, session);
  149. //messages.saveMessageToRedisFromCreateSession(sessionId, messageId, message);
  150. });
  151. })
  152. }
  153. }
  154. /**
  155. * 创建会话。mysql
  156. * @param sessionId
  157. * @param name
  158. * @param type
  159. * @param participantArray
  160. * @param messageId
  161. * @param handler
  162. */
  163. createSessionToMysql(sessionId, name, type, participantArray, messageId, handler) {
  164. let self = this;
  165. //如果sessionId不存在则执行创建sessionId过程
  166. let participantIdArray = [];
  167. for (let i in participantArray) {
  168. participantIdArray.push(participantArray[i].split(":")[0]);
  169. }
  170. //流程1-判断是否存在sessionId不存在则创建对应的sessionId;
  171. if (!sessionId) {
  172. if (type == SESSION_TYPES.P2P || type == SESSION_TYPES.SYSTEM) {
  173. ParticipantRepo.findSessionIdByParticipantIds(participantIdArray[0], participantIdArray[1], function (err, res) {
  174. sessionId = res;
  175. callBusinessType();
  176. });
  177. } else {
  178. return handler("MUC模式和团队模式,不允许sessionId为空!", null);
  179. }
  180. } else {
  181. callBusinessType();
  182. }
  183. //流程2-判断session的业务类型;
  184. function callBusinessType() {
  185. ParticipantRepo.getBusinessType(participantIdArray, function (err, businessType) {
  186. if (err) {
  187. handler(err, null);
  188. return;
  189. }
  190. callCreateSession(businessType);
  191. });
  192. }
  193. //流程3-发起session创建 返回session实例
  194. function callCreateSession(businessType) {
  195. //查找该sessionId是否存在存在则直接返回实例
  196. SessionRepo.findOne(sessionId, function (err, res) {
  197. if (res.length > 0) {//已经存在
  198. //更新成员
  199. Participants.saveParticipantsToMysql(sessionId, participantArray, function (err, update) {
  200. handler(null, res[0]);
  201. return;
  202. })
  203. } else {
  204. let createDate = new Date();
  205. let session = {
  206. id: sessionId,
  207. name: name,
  208. type: type,
  209. create_date: createDate.getTime(),
  210. business_type: businessType
  211. };
  212. //将session写入数据库
  213. self.saveSessionToMysql(sessionId, name, type, createDate, businessType, function (err, res) {
  214. if (err) {
  215. handler(err, null);
  216. return;
  217. }
  218. callCreateParticipants(session);
  219. })
  220. }
  221. });
  222. }
  223. //流程4-发起session成员创建
  224. function callCreateParticipants(session) {
  225. Participants.saveParticipantsToMysql(sessionId, participantArray, function (err, res) {
  226. if (err) {
  227. handler(err, null);
  228. return;
  229. } else {
  230. handler(null, session);
  231. return;
  232. }
  233. })
  234. }
  235. }
  236. /**
  237. * 最近会话列表,7天内。
  238. *
  239. * @param userId
  240. * @param dateSpan
  241. */
  242. getRecentSessions(userId, dateSpan) {
  243. let self = this;
  244. SessionRepo.findAllByTimestampAndType(userId, dateSpan, function (err, res) {
  245. if (err) {
  246. ModelUtil.emitError(self.eventEmitter, "Get recent sessions failed", err);
  247. return;
  248. }
  249. let sessions = [];
  250. res.forEach(function (session) {
  251. sessions.push({
  252. id: session.id,
  253. name: session.name,
  254. type: session.type,
  255. business_type: session.business_type,
  256. create_date: session.create_date
  257. })
  258. });
  259. ModelUtil.emitOK(self.eventEmitter, sessions);
  260. });
  261. }
  262. /**
  263. * 保存session到MySQL
  264. * @param sessionId
  265. * @param name
  266. * @param type
  267. * @param createDate
  268. * @param businessType
  269. * @param handler
  270. */
  271. saveSessionToMysql(sessionId, name, type, createDate, businessType, handler) {
  272. SessionRepo.saveSession(sessionId, name, type, createDate, businessType, handler);
  273. }
  274. /**
  275. * 获取某个用户的全部session列表
  276. * @param userId
  277. * @param handler
  278. */
  279. getUserSessionsFromMysql(userId, handler) {
  280. SessionRepo.findAll(userId, handler);
  281. }
  282. /**
  283. * 获取session单个对象
  284. * @param sessionId
  285. * @param handler
  286. */
  287. getSessions(sessionId, handler) {
  288. SessionRepo.findOne(sessionId, handler);
  289. }
  290. /**
  291. * 根据用户ID获取用户的session列表
  292. * @param userId
  293. * @param page
  294. * @param size
  295. * @param businessType
  296. */
  297. getUserSessions(userId, page, size, businessType) {
  298. let userSessionKey = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, userId);
  299. let self = this;
  300. if (page > 0) {
  301. if (page == 1) {
  302. page = 0;
  303. }
  304. page = page + page * size;
  305. }
  306. async.waterfall([
  307. // 获取会话ID列表
  308. function (callback) {
  309. redis.zrevrangeAsync(userSessionKey, page, size)
  310. .then(function (sessionIds) {
  311. if (sessionIds.length == 0) {
  312. ModelUtil.emitOK(self.eventEmitter, []);
  313. return;
  314. }
  315. callback(null, sessionIds);
  316. })
  317. },
  318. // 遍历会话
  319. function (sessionIds) {
  320. let sessionList = [];
  321. let functionList = [];
  322. for (let j = 0; j < sessionIds.length; j++) {
  323. let fun = function (index, callback) {
  324. if (!callback) {
  325. callback = index, index = 0
  326. }
  327. let sessionId = sessionIds[index];
  328. let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
  329. let participantsRoleKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipantsRole, sessionId);
  330. let sessionParticipantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  331. redis.multi()
  332. .hgetall(sessionKey) // 会话实体
  333. .hget(participantsRoleKey, userId) // 用户在此会话中的角色
  334. .zscore(sessionParticipantsKey, userId) // 用户在此会话中最后一次获取未读消息的时间
  335. .zrange(sessionParticipantsKey, 0, -1)
  336. .zrange(sessionParticipantsKey, 0,-1,'withscores') // 所有用户在此会话中最后一次获取未读消息的时间
  337. .execAsync()
  338. .then(function (res) {
  339. let session = res[0];
  340. let role = res[1];
  341. let lastFetchTime = res[2];
  342. let users = res[3];
  343. let participantsTimeArray = res[4];
  344. let participantsTime = [];
  345. for(var j = 0 ;j<participantsTimeArray.length;j++){
  346. if(j%2!=0)continue;
  347. let participantsTimeJson = {};
  348. participantsTimeJson[participantsTimeArray[j]] = participantsTimeArray[j+1];
  349. participantsTime.push(participantsTimeJson);
  350. }
  351. let sessionName = "";
  352. let otherUserId = "";
  353. if (session.type == SESSION_TYPES.P2P) {
  354. for (let j in users) {
  355. if (users[j] != userId) {
  356. otherUserId = users[j];
  357. }
  358. }
  359. }
  360. if (!role) role = 0;
  361. if (!lastFetchTime) lastFetchTime = new Date().getTime();
  362. // 计算未读消息数
  363. let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  364. redis.zcountAsync(messagesByTimestampKey, lastFetchTime, new Date().getTime())
  365. .then(function (count) {
  366. if (!otherUserId) otherUserId = userId;
  367. ParticipantRepo.findNameById(otherUserId, function (err, res) {
  368. if ((res && res.length == 0) || session.type != SESSION_TYPES.P2P) {
  369. sessionName = session.name;
  370. } else {
  371. sessionName = res[0].name;
  372. }
  373. var bir = new Date().getTime();
  374. if (res.length != 0 && res[0].birthdate) {
  375. bir = res[0].birthdate.getTime();
  376. }
  377. var sex = 1;
  378. if (res.length != 0 && res[0].sex) {
  379. sex = res[0].sex;
  380. }
  381. sessionList.push({
  382. id: sessionId,
  383. name: sessionName,
  384. create_date: session.create_date,
  385. last_content_type: session.last_content_type,
  386. last_content: session.last_content,
  387. sender_id: session.sender_id,
  388. type: session.type,
  389. sender_name: session.sender_name,
  390. unread_count: count,
  391. business_type: session.business_type,
  392. my_role: role,
  393. sender_sex: sex,
  394. sender_birthday: bir,
  395. participantsTimeArray:participantsTime,
  396. status:session.status
  397. });
  398. index = (parseInt(index) + 1);
  399. if (index == sessionIds.length) {
  400. ModelUtil.emitOK(self.eventEmitter, sessionList);
  401. } else {
  402. callback(null, index);
  403. }
  404. })
  405. })
  406. })
  407. .catch(function (err) {
  408. logger.error("Get sessions failed: ", err);
  409. });
  410. };
  411. functionList.push(fun);
  412. }
  413. async.waterfall(functionList);
  414. }
  415. ]);
  416. }
  417. /**
  418. * 获取会话消息。全部,不管已读/未读状态。
  419. *
  420. * @param sessionId 会话ID
  421. * @param userId 拉取消息的人
  422. * @param page 第几页
  423. * @param pagesize 分页数量
  424. * @param start_msg_id 消息会话最新的一条消息的ID
  425. * @param end_msg_id 消息会话刚开始的消息ID
  426. */
  427. getMessages(sessionId, user, start_msg_id, end_msg_id, page, pagesize, isoffset, handler) {
  428. let self = this;
  429. let message_timestamp_key = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  430. if (!start_msg_id && !end_msg_id) {
  431. redis.zrevrangeAsync(message_timestamp_key, 0, 0).then(function (res) {
  432. if (res.length == 0) {
  433. if (handler) {
  434. handler(null, res);
  435. return;
  436. }
  437. ModelUtil.emitOK(self.eventEmitter, res);
  438. return;
  439. }
  440. start_msg_id = res[0];
  441. redis.zrangeAsync(message_timestamp_key, 0, 0).then(function (res) {
  442. if (res.length == 0) {
  443. if (handler) {
  444. handler(null, res);
  445. return;
  446. }
  447. ModelUtil.emitOK(self.eventEmitter, res);
  448. return;
  449. }
  450. end_msg_id = res[0];
  451. self.getMessagesByPage(sessionId, user, end_msg_id, start_msg_id, page, pagesize, isoffset, function (err, res) {
  452. if (err) {
  453. if (handler) {
  454. handler(err, null);
  455. return;
  456. }
  457. logger.error("getMessagesByPage error" + err);
  458. ModelUtil.emitError(self.eventEmitter, err, err);
  459. } else {
  460. if (handler) {
  461. handler(null, res);
  462. return;
  463. }
  464. ModelUtil.emitOK(self.eventEmitter, res);
  465. }
  466. })
  467. })
  468. })
  469. } else if (!start_msg_id) {
  470. redis.zrevrangeAsync(message_timestamp_key, 0, 0).then(function (res) {
  471. if (res.length == 0) {
  472. if (handler) {
  473. handler(null, res);
  474. return;
  475. }
  476. ModelUtil.emitOK(self.eventEmitter, res);
  477. return;
  478. }
  479. start_msg_id = res[0];
  480. self.getMessagesByPage(sessionId, user, start_msg_id,end_msg_id , page, pagesize, isoffset, function (err, res) {
  481. if (err) {
  482. if (handler) {
  483. handler(err, null);
  484. return;
  485. }
  486. logger.error("getMessagesByPage error" + err);
  487. ModelUtil.emitError(self.eventEmitter, err, err);
  488. } else {
  489. if (handler) {
  490. handler(null, res);
  491. return;
  492. }
  493. ModelUtil.emitOK(self.eventEmitter, res);
  494. }
  495. })
  496. })
  497. } else if (!end_msg_id) {
  498. redis.zrangeAsync(message_timestamp_key, 0, 0).then(function (res) {
  499. if (res.length == 0) {
  500. ModelUtil.emitOK(self.eventEmitter, res);
  501. return;
  502. }
  503. end_msg_id = res[0];
  504. self.getMessagesByPage(sessionId, user, end_msg_id,start_msg_id, page, pagesize, isoffset, function (err, res) {
  505. if (err) {
  506. if (handler) {
  507. handler(err, null);
  508. return;
  509. }
  510. logger.error("getMessagesByPage error" + err);
  511. ModelUtil.emitError(self.eventEmitter, err, err);
  512. } else {
  513. if (handler) {
  514. handler(null, res);
  515. return;
  516. }
  517. ModelUtil.emitOK(self.eventEmitter, res);
  518. }
  519. })
  520. })
  521. } else {
  522. self.getMessagesByPage(sessionId, user, end_msg_id, start_msg_id, page, pagesize, isoffset, function (err, res) {
  523. if (err) {
  524. if (handler) {
  525. handler(err, null);
  526. return;
  527. }
  528. logger.error("getMessagesByPage error" + err);
  529. ModelUtil.emitError(self.eventEmitter, err, err);
  530. } else {
  531. if (handler) {
  532. handler(null, res);
  533. return;
  534. }
  535. ModelUtil.emitOK(self.eventEmitter, res);
  536. }
  537. })
  538. }
  539. }
  540. /**
  541. * 分页获取会话消息。
  542. *
  543. * @param sessionId 必选。会话ID
  544. * @param userId 必选。用户ID
  545. * @param startMsgId 必选。会话的的起始消息ID,作为检索的起始依据
  546. * @param endMsgId 必选。会话中的结束消息ID
  547. * @param page 必选。页码
  548. * @param size 必选。页面大小
  549. * @param handler 必选。回调
  550. */
  551. getMessagesByPage(sessionId, userId, startMsgId, endMsgId, page, size, isoffset, handler) {
  552. let messagesTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  553. let messagesKey = RedisModel.makeRedisKey(REDIS_KEYS.Messages, sessionId);
  554. let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
  555. let sessionParticipantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  556. let participants = new Participants();
  557. let offset = (page - 1 < 0 ? 0 : page - 1) * size;
  558. let count = size;
  559. if (page > 1 || isoffset == 1) {
  560. offset += 1; // 翻页由于闭区间,需跳过本身数据
  561. }
  562. participants.existsParticipant(sessionId, userId, function (err, res) {
  563. if (!res) {
  564. handler(Error("User not found in session " + sessionId), null);
  565. } else {
  566. //将消息ID转换成分值
  567. redis.multi()
  568. .zscore(messagesTimestampKey, startMsgId)
  569. .zscore(messagesTimestampKey, endMsgId)
  570. .hgetall(sessionKey)
  571. .zrange(sessionParticipantsKey, 0, -1)
  572. .execAsync()
  573. .then(function (res) {
  574. let startMsgScore = res[1];
  575. let endMsgScore = res[0];
  576. let session = res[2];
  577. let users = res[3];
  578. if (startMsgScore == null || endMsgScore == null || (startMsgScore == endMsgScore && isoffset == 1)) {
  579. handler(null, []);
  580. return;
  581. }
  582. if(endMsgScore>startMsgScore){
  583. redis.zrangebyscoreAsync(messagesTimestampKey, startMsgScore, endMsgScore, "limit", offset, count)
  584. .then(function (res) {
  585. if (res.length == 0) {
  586. handler(null, []);
  587. return;
  588. }
  589. redis.hmgetAsync(messagesKey, res).then(function (messages) {
  590. messages.reverse();
  591. handler(null, messages);
  592. }).then(function () {
  593. Sessions.updateParticipantLastFetchTime(sessionId, userId, new Date().getTime());
  594. if(session.type != SESSION_TYPES.GROUP){
  595. for(var j in users){
  596. if(users[j]==userId)continue;
  597. WechatClient.sendAllRead(users[j]);
  598. }
  599. }
  600. })
  601. })
  602. .catch(function (err) {
  603. logger.error("Get message by page failed: ", err);
  604. handler(err, false);
  605. })
  606. }else{
  607. // 从消息时间表中过滤出要获取的消息ID列表,倒序取出消息
  608. redis.zrevrangebyscoreAsync(messagesTimestampKey, startMsgScore, endMsgScore, "limit", offset, count)
  609. .then(function (res) {
  610. if (res.length == 0) {
  611. handler(null, []);
  612. return;
  613. }
  614. redis.hmgetAsync(messagesKey, res).then(function (messages) {
  615. handler(null, messages);
  616. }).then(function () {
  617. Sessions.updateParticipantLastFetchTime(sessionId, userId, new Date().getTime());
  618. })
  619. })
  620. .catch(function (err) {
  621. logger.error("Get message by page failed: ", err);
  622. handler(err, false);
  623. })
  624. }
  625. })
  626. }
  627. })
  628. }
  629. /**
  630. * 获取所有会话的未读消息数。
  631. */
  632. getAllSessionsUnreadMessageCount(userId) {
  633. let self = this;
  634. let count = 0;
  635. let patientCount = 0;
  636. let doctorCount = 0;
  637. SessionRepo.findAll(userId, function (err, res) {
  638. if (err) {
  639. ModelUtil.logError("getAllSessionsUnreadMessageCount is failed", err);
  640. return;
  641. }
  642. if (res.length == 0) {
  643. ModelUtil.emitOK(self.eventEmitter, {count: count});
  644. return;
  645. }
  646. for (let j in res) {
  647. if (res[j].type == SESSION_TYPES.SYSTEM) {
  648. if (j == res.length - 1) {
  649. ModelUtil.emitOK(self.eventEmitter, {count: count});
  650. }
  651. continue;
  652. }
  653. callback(res, j, res[j]);
  654. }
  655. });
  656. function callback(res, j, session) {
  657. self.getSessionUnreadMessageCount(res[j].id, userId, function (err, con) {
  658. if (err) {
  659. ModelUtil.logError("getAllSessionsUnreadMessageCount is failed", err);
  660. }
  661. count = count + con;
  662. if (session.type == 2) {
  663. patientCount = patientCount + con;
  664. } else {
  665. doctorCount = doctorCount + con;
  666. }
  667. if (j == res.length - 1) {
  668. ModelUtil.emitOK(self.eventEmitter, {count: count, patient: patientCount, doctor: doctorCount});
  669. }
  670. })
  671. }
  672. }
  673. /**
  674. * 获取会话的未读消息数。根据成员最后一次获取消息的时候与当前时间。
  675. *
  676. * @param sessionId
  677. * @param userId
  678. */
  679. getSessionUnreadMessageCount(sessionId, userId, handler) {
  680. let self = this;
  681. let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  682. let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  683. async.waterfall([
  684. // 此成员最后获取消息的时间
  685. function (callback) {
  686. redis.zscoreAsync(participantsKey, userId)
  687. .then(function (lastFetchTime) {
  688. callback(null, lastFetchTime);
  689. })
  690. },
  691. // 计算最后获取消息的时间之后到现在有多少条消息
  692. function (lastFetchTime, callback) {
  693. if (!lastFetchTime) lastFetchTime = 0;
  694. let now = new Date().getTime();
  695. redis.zcountAsync(messagesByTimestampKey, lastFetchTime, now)
  696. .then(function (count) {
  697. if (handler) {
  698. handler(null, count);
  699. } else {
  700. ModelUtil.emitOK(self.eventEmitter, {count: count});
  701. }
  702. })
  703. }
  704. ], function (err, res) {
  705. if (err) {
  706. if (handler) {
  707. handler(err, 0);
  708. } else {
  709. ModelUtil.emitError(self.eventEmitter, "Get session unread message count failed.")
  710. }
  711. }
  712. });
  713. }
  714. /**
  715. * 获取会话未读消息数。根据成员最后一次获取消息的时候与当前时间。
  716. */
  717. getSessionUnreadMessages(sessionId, userId) {
  718. let self = this;
  719. let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  720. let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  721. async.waterfall([
  722. // 此成员最后获取消息的时间
  723. function (callback) {
  724. redis.zscoreAsync(participantsKey, userId)
  725. .then(function (lastFetchTime) {
  726. callback(null, lastFetchTime);
  727. })
  728. },
  729. // 最后获取消息的时间之后到现在的消息ID列表
  730. function (lastFetchTime, callback) {
  731. if (!lastFetchTime) lastFetchTime = 0;
  732. let now = new Date().getTime();
  733. redis.zrangebyscoreAsync(messagesByTimestampKey, lastFetchTime, now)
  734. .then(function (messageIds) {
  735. callback(null, messageIds);
  736. })
  737. },
  738. // 获取消息
  739. function (messageIds, callback) {
  740. if (messageIds.length == 0) {
  741. ModelUtil.emitOK(self.eventEmitter, []);
  742. return;
  743. }
  744. let startMsgId = messageIds[0];
  745. let endMsgId = messageIds[messageIds.length - 1];
  746. self.getMessagesByPage(sessionId, userId, startMsgId, endMsgId, 0, messageIds.length, 0, function (err, res) {
  747. if (err) {
  748. ModelUtil.emitError(self.eventEmitter, err.message);
  749. return;
  750. }
  751. ModelUtil.emitOK(self.eventEmitter, res);
  752. });
  753. }
  754. ], function (err, res) {
  755. if (err) {
  756. ModelUtil.emitError(self.eventEmitter, "Get session unread message count failed.")
  757. }
  758. });
  759. }
  760. /**
  761. * 保存消息。
  762. *
  763. * 也可以根据议题保存消息,但最终还是保存到与会话对象。
  764. *
  765. * see also: saveMessageByTopic
  766. *
  767. * @param message
  768. * @param sessionId
  769. */
  770. saveMessageBySession(sessionId, message) {
  771. let self = this;
  772. let messages = new Messages();
  773. let participants = new Participants();
  774. let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
  775. let messageId = mongoose.Types.ObjectId().toString();
  776. message.id = messageId;
  777. // 检查会话中是否存在此成员
  778. participants.existsParticipant(sessionId, message.sender_id, function (err, res) {
  779. if (err) {
  780. ModelUtil.emitError(self.eventEmitter, "Check session participant failed: ", err);
  781. return;
  782. }
  783. if (res) {
  784. redis.hmgetAsync(sessionKey, ["type", "name"]).then(function (res) {
  785. let sessionType = res[0];
  786. let sessionName = res[1];
  787. if (sessionType == null) {
  788. ModelUtil.emitError(self.eventEmitter, "Session " + sessionId + " is not found.");
  789. return;
  790. }
  791. // 消息保存到Redis,并更新会话最后状态、用户最后消息获取时间
  792. messages.saveMessageToRedis(sessionId, sessionType, messageId, message);
  793. Messages.updateLastContent(sessionKey, sessionType, sessionName, message);
  794. Sessions.updateParticipantLastFetchTime(sessionId, message.sender_id, message.timestamp.getTime());
  795. // 更新MYSQL中会话的最新状态,并保存消息
  796. SessionRepo.updateSessionLastStatus(message.sender_id, message.sender_name, message.timestamp, message.content, message.content_type, sessionId);
  797. messages.saveMessageToMysql(sessionId, sessionType, messageId, message, function (err, res) {
  798. if (err) {
  799. ModelUtil.emitError(self.eventEmitter, {message: "Failed to save message to mysql: " + err});
  800. } else {
  801. message.timestamp = message.timestamp.getTime();
  802. ModelUtil.emitOK(self.eventEmitter, {count: 1, messages: [message]});
  803. }
  804. });
  805. }).then(function (res) {
  806. // 推送消息
  807. ParticipantRepo.findIds(sessionId, function (err, res) {
  808. if (err) {
  809. ModelUtil.logError("Push message from session: get participant's id list failed: ", err);
  810. } else {
  811. message.session_id = sessionId;
  812. res.forEach(function (participant) {
  813. if (participant.id !== message.sender_id &&
  814. participant.participant_role == PARTICIPANT_ROLES.HOST) {
  815. Sessions.pushNotification(participant.id, participant.name, message);
  816. }
  817. });
  818. }
  819. })
  820. }).catch(function (err) {
  821. ModelUtil.emitError(self.eventEmitter, {message: "Error occurred while save message to session: " + err});
  822. })
  823. } else {
  824. ModelUtil.emitDataNotFound(self.eventEmitter, {message: "当前会话找不到此发送者"});
  825. }
  826. });
  827. }
  828. sendTopicMessages(topicId, message) {
  829. let self = this;
  830. TopicRepo.findAllByTopicId(topicId, function (err, res) {
  831. if (err || res.length == 0) {
  832. ModelUtil.emitOK(self.eventEmitter, {status: -1, "message": "议题获取失败"});
  833. return;
  834. }
  835. self.saveMessageByTopic(message, res[0].session_id, function (err, messageId) {
  836. if (err) {
  837. ModelUtil.emitOK(self.eventEmitter, {status: -1, "message": err});
  838. } else {
  839. message.id = messageId;
  840. ModelUtil.emitOK(self.eventEmitter, {status: 200, "message": "发送成功", data: message});
  841. }
  842. });
  843. });
  844. }
  845. /**
  846. * 保存消息
  847. *
  848. * @param message
  849. * @param sessionId
  850. * @param handler
  851. */
  852. saveMessageByTopic(message, sessionId, handler) {
  853. let messages = new Messages();
  854. let participants = new Participants();
  855. let session_key = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
  856. let messageId = mongoose.Types.ObjectId().toString();
  857. let self = this;
  858. let sessionType = 0;
  859. let sessionName = "";
  860. message.id = messageId;
  861. // 发送成员必须处于会话中
  862. participants.existsParticipant(sessionId, message.sender_id, function (err, res) {
  863. if (res) {
  864. redis.hmgetAsync(session_key, ["type", "name"]).then(function (res) {
  865. sessionType = res[0];
  866. sessionName = res[1];
  867. if (!sessionType || !sessionName) {
  868. logger.error("Unknown session key " + session_key);
  869. if (handler) {handler(new Error("Unknown session key " + session_key));return;} ;
  870. }
  871. }).then(function (res) {
  872. // 消息数据双写,并更新用户最后消息获取时间,会话新状态等
  873. messages.saveMessageToRedis(sessionId, sessionType, messageId, message);
  874. messages.saveMessageToMysql(sessionId, sessionType, messageId, message);
  875. // 更新会话最新状态及成员最后一次消息获取时间
  876. Sessions.updateParticipantLastFetchTime(sessionId, message.sender_id, message.timestamp.getTime());
  877. Messages.updateLastContent(session_key, sessionType, sessionName, message);
  878. SessionRepo.updateSessionLastStatus(message.sender_id, message.sender_name, message.timestamp, message.content, message.content_type, sessionId);
  879. if (handler) {handler(null, messageId);return;}
  880. }).then(function (res) {
  881. // 推送消息
  882. ParticipantRepo.findIds(sessionId, function (err, res) {
  883. if (err) {
  884. logger.error(err);
  885. } else {
  886. message.session_id = sessionId;
  887. res.forEach(function (participant) {
  888. if (participant.id !== message.sender_id &&
  889. participant.participant_role == PARTICIPANT_ROLES.HOST) {
  890. Sessions.pushNotification(participant.id, participant.name, message);
  891. }
  892. });
  893. }
  894. })
  895. }).catch(function (err) {
  896. if (handler) { handler(err, messageId);return;}
  897. })
  898. } else {
  899. if (handler){ handler("用户不在此会话当中!", messageId);return;}
  900. }
  901. })
  902. }
  903. /**
  904. * 置顶操作
  905. */
  906. stickSession(sessionId, user) {
  907. let user_session_key = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, user);
  908. let self = this;
  909. //取出最大的session
  910. redis.zrevrangeAsync(user_session_key, 0, 0).then(function (res) {
  911. //获取该session的时间搓
  912. redis.zscoreAsync(user_session_key, res).then(function (scoreres) {
  913. let nowtime = new Date().getTime();
  914. //当前时间搓比redis的时间搓更早证明没有置顶过
  915. if (scoreres <= nowtime) {
  916. //初始化置顶
  917. redis.zaddAsync(user_session_key, STICKY_SESSION_BASE_SCORE, sessionId).then(function (res) {
  918. logger.info("stickSession:" + sessionId + ",res:" + res);
  919. ModelUtil.emitOK(self.eventEmitter, {});
  920. }).then(function () {
  921. SessionRepo.saveStickySession(sessionId, user, STICKY_SESSION_BASE_SCORE);
  922. })
  923. } else {
  924. //已有置顶的数据,取出来加1保存回去
  925. scoreres = Number(scoreres) + 1;
  926. redis.zaddAsync(user_session_key, scoreres, sessionId).then(function () {
  927. logger.info("stickSession:" + sessionId + ",res:" + res);
  928. ModelUtil.emitOK(self.eventEmitter, {});
  929. }).then(function () {
  930. SessionRepo.saveStickySession(sessionId, user, scoreres);
  931. })
  932. }
  933. })
  934. })
  935. }
  936. /**
  937. * 取消会话置顶
  938. */
  939. cancelStickSession(sessionId, user) {
  940. let user_session_key = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, user);
  941. let participants_key = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  942. let self = this;
  943. redis.zscoreAsync(participants_key, user).then(function (res) {
  944. if (!res) {
  945. res = new Date().getTime();
  946. }
  947. redis.zaddAsync(user_session_key, res, sessionId).then(function (res) {
  948. logger.info("cancelStickSession:" + sessionId);
  949. ModelUtil.emitOK(self.eventEmitter, {});
  950. }).then(function () {
  951. SessionRepo.unstickSession(sessionId, user);
  952. });
  953. })
  954. }
  955. /**
  956. * 更新会话参与者的最后消息获取时间。
  957. *
  958. * @param sessionId
  959. * @param userId
  960. */
  961. static updateParticipantLastFetchTime(sessionId, userId, score) {
  962. score = score + 1;
  963. let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  964. redis.zaddAsync(participantsKey, score, userId)
  965. .then(function (res) {
  966. ParticipantRepo.updateLastFetchTime(new Date(score), sessionId, userId, function (err, res) {
  967. if (err) {
  968. logger.error("Update participant last fetch time failed: ", err);
  969. }
  970. });
  971. })
  972. .catch(function (err) {
  973. logger.error("Update participant last fetch time failed: ", err);
  974. });
  975. }
  976. /**
  977. * 向用户推送通知,微信端用户直接推送消息,APP端通过个推发送通知消息。
  978. *
  979. * @param targetUserId
  980. * @param message
  981. */
  982. static pushNotification(targetUserId, targetUserName, message) {
  983. Users.isPatientId(targetUserId, function (err, isPatient) {
  984. if (isPatient) {
  985. WechatClient.sendMessage(targetUserId, targetUserName, message);
  986. }
  987. else {
  988. WechatClient.sendReadDoctorByDoctorId(targetUserId, message);
  989. AppClient.sendNotification(targetUserId, message);
  990. }
  991. });
  992. }
  993. /**
  994. * 针对MUC模式更新会话的当前状态
  995. * @param sessionId
  996. */
  997. updateSessionStatus(sessionId,status){
  998. let self = this;
  999. let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session,sessionId);
  1000. redis.hsetAsync(sessionKey,"status",status).then(function(res){
  1001. SessionRepo.updateSessionStatus(sessionId,status,function(err,sqlResult){
  1002. if(err){
  1003. logger.error("set session status to mysql is error !");
  1004. }else{
  1005. logger.info("set session status is success");
  1006. }
  1007. });
  1008. });
  1009. }
  1010. }
  1011. // Expose class
  1012. module.exports = Sessions;