sessions.js 42 KB


  1. /**
  2. * 会话模型。
  3. */
  4. "use strict";
  5. let RedisClient = require('../../repository/redis/redis.client.js');
  6. let RedisModel = require('./../redis.model.js');
  7. let ModelUtil = require('../../util/model.util');
  8. let Messages = require('../messages/messages');
  9. let Users = require('../user/users');
  10. let Participants = require('./participants');
  11. let SessionRepo = require('../../repository/mysql/session.repo');
  12. let TopicRepo = require('../../repository/mysql/topics.repo');
  13. let ParticipantRepo = require('../../repository/mysql/participant.repo');
  14. let WechatClient = require("../client/wechat.client.js");
  15. let AppClient = require("../client/app.client.js");
  16. let configFile = require('../../include/commons').CONFIG_FILE;
  17. let config = require('../../resources/config/' + configFile);
  18. let redis = RedisClient.redisClient().connection;
  19. let logger = require('../../util/log.js');
  20. let mongoose = require('mongoose');
  21. let async = require("async");
  22. const REDIS_KEYS = require('../../include/commons').REDIS_KEYS;
  23. const SESSION_TYPES = require('../../include/commons').SESSION_TYPES;
  24. const STICKY_SESSION_BASE_SCORE = require('../../include/commons').STICKY_SESSION_BASE_SCORE;
  25. class Sessions extends RedisModel {
  26. constructor() {
  27. super();
  28. }
  29. /**
  30. * 创建会话。会话的ID来源:
  31. * MUC:患者的ID
  32. * P2P:对成员的ID排序后,取hash值
  33. * GROUP:团队的ID
  34. *
  35. * @param sessionId
  36. * @param name 会话名称
  37. * @param type 会话类型
  38. * @param participantArray 会话成员
  39. * @param handler 回调,仅MUC模式使用
  40. */
  41. createSession(sessionId, name, type, participantArray, handler) {
  42. let self = this;
  43. let messageId = mongoose.Types.ObjectId().toString();
  44. //创建session到mysql
  45. self.createSessionToMysql(sessionId, name, type, participantArray, messageId, function (err, res) {
  46. if (err) {
  47. ModelUtil.emitError(self.eventEmitter, {message: err, status: -1}, null);
  48. } else {
  49. //创建session到redis
  50. self.createSessionToRedis(sessionId, name, type, participantArray, messageId, function (err, res) {
  51. if (err) {
  52. if (handler) {
  53. handler(err, null);
  54. return;
  55. }
  56. ModelUtil.emitError(self.eventEmitter, {message: err, status: -1}, null);
  57. } else {
  58. if (handler) {
  59. handler(null, res);
  60. return;
  61. }
  62. ModelUtil.emitOK(self.eventEmitter, {status: 200, data: res});
  63. }
  64. });
  65. }
  66. });
  67. }
  68. /**
  69. * 创建会话。REDIS
  70. * @param sessionId
  71. * @param name
  72. * @param type
  73. * @param participantArray
  74. * @param messageId
  75. * @param handler
  76. * @returns {boolean}
  77. */
  78. createSessionToRedis(sessionId, name, type, participantArray, messageId, handler) {
  79. let self = this;
  80. let messages = new Messages();
  81. let participantIdArray = [];
  82. for (let i in participantArray) {
  83. participantIdArray.push(participantArray[i].split(":")[0]);
  84. }
  85. logger.error("create session by participantIdArray,:"+participantIdArray.join(","));
  86. logger.error("create session by type,:"+type);
  87. if (type == SESSION_TYPES.P2P || type == SESSION_TYPES.SYSTEM) {
  88. if (sessionId) {
  89. callBusinessType(sessionId);
  90. return;
  91. }
  92. if (participantIdArray.length != 2) {
  93. handler("P2P session only allow 2 participants.", null);
  94. return false;
  95. }
  96. ParticipantRepo.findSessionIdByParticipantIds(participantIdArray[0], participantIdArray[1], function (err, res) {
  97. sessionId = res;
  98. callBusinessType(sessionId);
  99. });
  100. } else {
  101. if (!sessionId) {
  102. handler("MUC OR GROUP session sessionId is not allow null .", null);
  103. return;
  104. }
  105. callBusinessType(sessionId);
  106. }
  107. function callBusinessType(sessionId) {
  108. ParticipantRepo.getBusinessType(participantIdArray, function (err, businessType) {
  109. callCreate(sessionId, businessType);
  110. });
  111. }
  112. function callCreate(sessionId, businessType) {
  113. let createDate = new Date();
  114. let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
  115. let message = {
  116. sender_id: "system",
  117. sender_name: "system",
  118. content_type: 11,
  119. content: "会话创建成功",
  120. timestamp: createDate,
  121. id: messageId
  122. };
  123. if(type == SESSION_TYPES.MUC){
  124. businessType = 2;
  125. }
  126. let session = {
  127. id: sessionId,
  128. name: name,
  129. type: type,
  130. create_date: createDate.getTime(),
  131. business_type: businessType,
  132. last_sender_id: message.sender_id,
  133. last_sender_name: message.sender_name,
  134. last_message_time: message.timestamp.getTime(),
  135. last_content: message.content,
  136. last_content_type: message.content_type
  137. };
  138. redis.hmsetAsync(sessionKey, session).then(function () {
  139. Participants.saveParticipantsToRedis(sessionId, participantArray, createDate, function (res) {
  140. handler(null, session);
  141. //messages.saveMessageToRedisFromCreateSession(sessionId, messageId, message);
  142. });
  143. })
  144. }
  145. }
  146. /**
  147. * 创建会话。mysql
  148. * @param sessionId
  149. * @param name
  150. * @param type
  151. * @param participantArray
  152. * @param messageId
  153. * @param handler
  154. */
  155. createSessionToMysql(sessionId, name, type, participantArray, messageId, handler) {
  156. let self = this;
  157. //如果sessionId不存在则执行创建sessionId过程
  158. let participantIdArray = [];
  159. for (let i in participantArray) {
  160. participantIdArray.push(participantArray[i].split(":")[0]);
  161. }
  162. //流程1-判断是否存在sessionId不存在则创建对应的sessionId;
  163. if (!sessionId) {
  164. if (type == SESSION_TYPES.P2P || type == SESSION_TYPES.SYSTEM) {
  165. ParticipantRepo.findSessionIdByParticipantIds(participantIdArray[0], participantIdArray[1], function (err, res) {
  166. sessionId = res;
  167. callBusinessType();
  168. });
  169. } else {
  170. return handler("MUC模式和团队模式,不允许sessionId为空!", null);
  171. }
  172. } else {
  173. callBusinessType();
  174. }
  175. //流程2-判断session的业务类型;
  176. function callBusinessType() {
  177. ParticipantRepo.getBusinessType(participantIdArray, function (err, businessType) {
  178. if (err) {
  179. handler(err, null);
  180. return;
  181. }
  182. callCreateSession(businessType);
  183. });
  184. }
  185. //流程3-发起session创建 返回session实例
  186. function callCreateSession(businessType) {
  187. //查找该sessionId是否存在存在则直接返回实例
  188. SessionRepo.findOne(sessionId, function (err, res) {
  189. if (res.length > 0) {//已经存在
  190. for (let i in participantArray) {
  191. //更新成员
  192. Participants.saveParticipantsToMysql(sessionId, participantArray, function (err, update) {
  193. handler(null, res[0]);
  194. })
  195. }
  196. } else {
  197. let createDate = new Date();
  198. let session = {
  199. id: sessionId,
  200. name: name,
  201. type: type,
  202. create_date: createDate.getTime(),
  203. business_type: businessType
  204. };
  205. //将session写入数据库
  206. self.saveSessionToMysql(sessionId, name, type, createDate, businessType, function (err, res) {
  207. if (err) {
  208. handler(err, null);
  209. return;
  210. }
  211. ;
  212. callCreateParticipants(session);
  213. })
  214. }
  215. });
  216. }
  217. //流程4-发起session成员创建
  218. function callCreateParticipants(session) {
  219. Participants.saveParticipantsToMysql(sessionId, participantArray, function (err, res) {
  220. if (err) {
  221. handler(err, null);
  222. return;
  223. }
  224. // callBeginTrans(session);
  225. })
  226. }
  227. //流程5-发起session会话
  228. function callBeginTrans(session) {
  229. let mesDate = new Date();
  230. let message = {
  231. sender_id: "system",
  232. sender_name: "system",
  233. content_type: 6,
  234. content: "会话创建成功",
  235. timestamp: mesDate,
  236. id: messageId
  237. };
  238. session.last_sender_id = message.sender_id;
  239. session.last_sender_name = message.sender_name;
  240. session.last_message_time = mesDate.getTime();
  241. session.last_content = message.content;
  242. session.last_content_type = message.content_type;
  243. SessionRepo.updateSessionLastStatus(message.sender_id,
  244. message.sender_name,
  245. message.timestamp,
  246. message.content,
  247. message.content_type,
  248. sessionId, function (err, res) {
  249. if (err) {
  250. handler(err, null);
  251. return;
  252. }
  253. handler(null, session);
  254. });
  255. }
  256. }
  257. /**
  258. * 最近会话列表,7天内。
  259. *
  260. * @param userId
  261. * @param dateSpan
  262. */
  263. getRecentSessions(userId, dateSpan) {
  264. let self = this;
  265. SessionRepo.findAllByTimestampAndType(userId, dateSpan, function (err, res) {
  266. if (err) {
  267. ModelUtil.emitError(self.eventEmitter, "Get recent sessions failed", err);
  268. return;
  269. }
  270. let sessions = [];
  271. res.forEach(function (session) {
  272. sessions.push({
  273. id: session.id,
  274. name: session.name,
  275. type: session.type,
  276. business_type: session.business_type,
  277. create_date: session.create_date
  278. })
  279. });
  280. ModelUtil.emitOK(self.eventEmitter, sessions);
  281. });
  282. }
  283. /**
  284. * 保存session到MySQL
  285. * @param sessionId
  286. * @param name
  287. * @param type
  288. * @param createDate
  289. * @param businessType
  290. * @param handler
  291. */
  292. saveSessionToMysql(sessionId, name, type, createDate, businessType, handler) {
  293. SessionRepo.saveSession(sessionId, name, type, createDate, businessType, handler);
  294. }
  295. /**
  296. * 获取某个用户的全部session列表
  297. * @param userId
  298. * @param handler
  299. */
  300. getUserSessionsFromMysql(userId, handler) {
  301. SessionRepo.findAll(userId, handler);
  302. }
  303. /**
  304. * 获取session单个对象
  305. * @param sessionId
  306. * @param handler
  307. */
  308. getSessions(sessionId, handler) {
  309. SessionRepo.findOne(sessionId, handler);
  310. }
  311. /**
  312. * 根据用户ID获取用户的session列表
  313. * @param userId
  314. * @param page
  315. * @param size
  316. * @param businessType
  317. */
  318. getUserSessions(userId, page, size, businessType) {
  319. let userSessionKey = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, userId);
  320. let self = this;
  321. if (page > 0) {
  322. if (page == 1) {
  323. page = 0;
  324. }
  325. page = page + page * size;
  326. }
  327. async.waterfall([
  328. // 获取会话ID列表
  329. function (callback) {
  330. redis.zrevrangeAsync(userSessionKey, page, size)
  331. .then(function (sessionIds) {
  332. if (sessionIds.length == 0) {
  333. ModelUtil.emitOK(self.eventEmitter, []);
  334. return;
  335. }
  336. callback(null, sessionIds);
  337. })
  338. },
  339. // 遍历会话
  340. function (sessionIds) {
  341. let sessionList = [];
  342. let functionList = [];
  343. for (let j = 0; j < sessionIds.length; j++) {
  344. let fun = function (index, callback) {
  345. if (!callback) {
  346. callback = index, index = 0
  347. }
  348. let sessionId = sessionIds[index];
  349. let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
  350. let participantsRoleKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipantsRole, sessionId);
  351. let sessionParticipantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  352. let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  353. redis.multi()
  354. .hgetall(sessionKey) // 会话实体
  355. .hget(participantsRoleKey, userId) // 用户在此会话中的角色
  356. .zscore(sessionParticipantsKey, userId) // 用户在此会话中最后一次获取未读消息的时间
  357. .zrange(participantsKey, 0, -1)
  358. .execAsync()
  359. .then(function (res) {
  360. let session = res[0];
  361. let role = res[1];
  362. let lastFetchTime = res[2];
  363. let users = res[3];
  364. let sessionName = "";
  365. let otherUserId = "";
  366. if (session.type == SESSION_TYPES.P2P) {
  367. for (let j in users) {
  368. if (users[j] != userId) {
  369. otherUserId = users[j];
  370. }
  371. }
  372. }
  373. if (!role) role = 0;
  374. if (!lastFetchTime) lastFetchTime = new Date().getTime();
  375. // 计算未读消息数
  376. let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  377. redis.zcountAsync(messagesByTimestampKey, lastFetchTime, new Date().getTime())
  378. .then(function (count) {
  379. if (!otherUserId) otherUserId = userId;
  380. ParticipantRepo.findNameById(otherUserId, function (err, res) {
  381. if ((res && res.length == 0) || session.type != SESSION_TYPES.P2P) {
  382. sessionName = session.name;
  383. } else {
  384. sessionName = res[0].name;
  385. }
  386. var bir = new Date().getTime();
  387. if(res.length!=0&&res[0].birthdate){
  388. bir = res[0].birthdate.getTime();
  389. }
  390. var sex = 1;
  391. if(res.length!=0&&res[0].sex){
  392. sex = res[0].sex;
  393. }
  394. sessionList.push({
  395. id: sessionId,
  396. name: sessionName,
  397. create_date: session.create_date,
  398. last_content_type: session.last_content_type,
  399. last_content: session.last_content,
  400. sender_id: session.sender_id,
  401. type: session.type,
  402. sender_name: session.sender_name,
  403. unread_count: count,
  404. business_type: session.business_type,
  405. my_role: role,
  406. sender_sex:sex,
  407. sender_birthday:bir,
  408. });
  409. index = (parseInt(index) + 1);
  410. if (index == sessionIds.length) {
  411. ModelUtil.emitOK(self.eventEmitter, sessionList);
  412. } else {
  413. callback(null, index);
  414. }
  415. })
  416. })
  417. })
  418. .catch(function (err) {
  419. logger.error("Get sessions failed: ", err);
  420. });
  421. };
  422. functionList.push(fun);
  423. }
  424. async.waterfall(functionList);
  425. }
  426. ]);
  427. }
  428. /**
  429. * 获取会话消息。全部,不管已读/未读状态。
  430. *
  431. * @param sessionId 会话ID
  432. * @param userId 拉取消息的人
  433. * @param page 第几页
  434. * @param pagesize 分页数量
  435. * @param start_msg_id 消息会话最新的一条消息的ID
  436. * @param end_msg_id 消息会话刚开始的消息ID
  437. */
  438. getMessages(sessionId, user, start_msg_id, end_msg_id, page, pagesize, isoffset, handler) {
  439. let self = this;
  440. let message_timestamp_key = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  441. if (!start_msg_id && !end_msg_id) {
  442. redis.zrevrangeAsync(message_timestamp_key, 0, 0).then(function (res) {
  443. if (res.length == 0) {
  444. if (handler) {
  445. handler(null, res);
  446. return;
  447. }
  448. ModelUtil.emitOK(self.eventEmitter, res);
  449. return;
  450. }
  451. start_msg_id = res[0];
  452. redis.zrangeAsync(message_timestamp_key, 0, 0).then(function (res) {
  453. if (res.length == 0) {
  454. if (handler) {
  455. handler(null, res);
  456. return;
  457. }
  458. ModelUtil.emitOK(self.eventEmitter, res);
  459. return;
  460. }
  461. end_msg_id = res[0];
  462. self.getMessagesByPage(sessionId, user, end_msg_id, start_msg_id, page, pagesize, isoffset, function (err, res) {
  463. if (err) {
  464. if (handler) {
  465. handler(err, null);
  466. return;
  467. }
  468. logger.error("getMessagesByPage error" + err);
  469. ModelUtil.emitError(self.eventEmitter, err, err);
  470. } else {
  471. if (handler) {
  472. handler(null, res);
  473. return;
  474. }
  475. ModelUtil.emitOK(self.eventEmitter, res);
  476. }
  477. })
  478. })
  479. })
  480. } else if (!start_msg_id) {
  481. redis.zrevrangeAsync(message_timestamp_key, 0, 0).then(function (res) {
  482. if (res.length == 0) {
  483. if (handler) {
  484. handler(null, res);
  485. return;
  486. }
  487. ModelUtil.emitOK(self.eventEmitter, res);
  488. return;
  489. }
  490. start_msg_id = res[0];
  491. self.getMessagesByPage(sessionId, user, end_msg_id, start_msg_id, page, pagesize, isoffset, function (err, res) {
  492. if (err) {
  493. if (handler) {
  494. handler(err, null);
  495. return;
  496. }
  497. logger.error("getMessagesByPage error" + err);
  498. ModelUtil.emitError(self.eventEmitter, err, err);
  499. } else {
  500. if (handler) {
  501. handler(null, res);
  502. return;
  503. }
  504. ModelUtil.emitOK(self.eventEmitter, res);
  505. }
  506. })
  507. })
  508. } else if (!end_msg_id) {
  509. redis.zrangeAsync(message_timestamp_key, 0, 0).then(function (res) {
  510. if (res.length == 0) {
  511. ModelUtil.emitOK(self.eventEmitter, res);
  512. return;
  513. }
  514. end_msg_id = res[0];
  515. self.getMessagesByPage(sessionId, user, start_msg_id, end_msg_id, page, pagesize, isoffset, function (err, res) {
  516. if (err) {
  517. if (handler) {
  518. handler(err, null);
  519. return;
  520. }
  521. logger.error("getMessagesByPage error" + err);
  522. ModelUtil.emitError(self.eventEmitter, err, err);
  523. } else {
  524. if (handler) {
  525. handler(null, res);
  526. return;
  527. }
  528. ModelUtil.emitOK(self.eventEmitter, res);
  529. }
  530. })
  531. })
  532. } else {
  533. self.getMessagesByPage(sessionId, user, end_msg_id, start_msg_id, page, pagesize, isoffset, function (err, res) {
  534. if (err) {
  535. if (handler) {
  536. handler(err, null);
  537. return;
  538. }
  539. logger.error("getMessagesByPage error" + err);
  540. ModelUtil.emitError(self.eventEmitter, err, err);
  541. } else {
  542. if (handler) {
  543. handler(null, res);
  544. return;
  545. }
  546. ModelUtil.emitOK(self.eventEmitter, res);
  547. }
  548. })
  549. }
  550. }
  551. /**
  552. * 分页获取会话消息。
  553. *
  554. * @param sessionId 必选。会话ID
  555. * @param userId 必选。用户ID
  556. * @param startMsgId 必选。会话的的起始消息ID,作为检索的起始依据
  557. * @param endMsgId 必选。会话中的结束消息ID
  558. * @param page 必选。页码
  559. * @param size 必选。页面大小
  560. * @param handler 必选。回调
  561. */
  562. getMessagesByPage(sessionId, userId, startMsgId, endMsgId, page, size, isoffset, handler) {
  563. let messagesTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  564. let messagesKey = RedisModel.makeRedisKey(REDIS_KEYS.Messages, sessionId);
  565. let participants = new Participants();
  566. let offset = (page - 1 < 0 ? 0 : page - 1) * size;
  567. let count = size;
  568. if (page > 1 || isoffset == 1) {
  569. offset += 1; // 翻页由于闭区间,需跳过本身数据
  570. }
  571. participants.existsParticipant(sessionId, userId, function (err, res) {
  572. if (!res) {
  573. handler(Error("User not found in session " + sessionId), null);
  574. } else {
  575. //将消息ID转换成分值
  576. redis.multi()
  577. .zscore(messagesTimestampKey, startMsgId)
  578. .zscore(messagesTimestampKey, endMsgId)
  579. .execAsync()
  580. .then(function (res) {
  581. let startMsgScore = res[1];
  582. let endMsgScore = res[0];
  583. if (startMsgScore == null || endMsgScore == null || startMsgScore == endMsgScore) {
  584. handler(null, []);
  585. return;
  586. }
  587. // 从消息时间表中过滤出要获取的消息ID列表,倒序取出消息
  588. redis.zrevrangebyscoreAsync(messagesTimestampKey, startMsgScore, endMsgScore, "limit", offset, count)
  589. .then(function (res) {
  590. if (res.length == 0) {
  591. handler(null, []);
  592. return;
  593. }
  594. redis.hmgetAsync(messagesKey, res).then(function (messages) {
  595. handler(null, messages);
  596. }).then(function () {
  597. Sessions.updateParticipantLastFetchTime(sessionId, userId, new Date().getTime());
  598. })
  599. })
  600. .catch(function (err) {
  601. logger.error("Get message by page failed: ", err);
  602. handler(err, false);
  603. })
  604. })
  605. }
  606. })
  607. }
  608. /**
  609. * 获取所有会话的未读消息数。
  610. */
  611. getAllSessionsUnreadMessageCount(userId) {
  612. let self = this;
  613. let count = 0;
  614. let patientCount = 0;
  615. let doctorCount = 0;
  616. SessionRepo.findAll(userId, function (err, res) {
  617. if (err) {
  618. ModelUtil.logError("getAllSessionsUnreadMessageCount is failed", err);
  619. return;
  620. }
  621. if (res.length == 0) {
  622. ModelUtil.emitOK(self.eventEmitter, {count: count});
  623. return;
  624. }
  625. for (let j in res) {
  626. if (res[j].type == SESSION_TYPES.SYSTEM) {
  627. if (j == res.length - 1) {
  628. ModelUtil.emitOK(self.eventEmitter, {count: count});
  629. }
  630. continue;
  631. }
  632. callback(res, j, res[j]);
  633. }
  634. });
  635. function callback(res, j, session) {
  636. self.getSessionUnreadMessageCount(res[j].id, userId, function (err, con) {
  637. if (err) {
  638. ModelUtil.logError("getAllSessionsUnreadMessageCount is failed", err);
  639. }
  640. count = count + con;
  641. if(session.type ==2){
  642. patientCount = patientCount+con;
  643. }else{
  644. doctorCount = doctorCount+con;
  645. }
  646. if (j == res.length - 1) {
  647. ModelUtil.emitOK(self.eventEmitter, {count: count,patient:patientCount,doctor:doctorCount});
  648. }
  649. })
  650. }
  651. }
  652. /**
  653. * 获取会话的未读消息数。根据成员最后一次获取消息的时候与当前时间。
  654. *
  655. * @param sessionId
  656. * @param userId
  657. */
  658. getSessionUnreadMessageCount(sessionId, userId, handler) {
  659. let self = this;
  660. let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  661. let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  662. async.waterfall([
  663. // 此成员最后获取消息的时间
  664. function (callback) {
  665. redis.zscoreAsync(participantsKey, userId)
  666. .then(function (lastFetchTime) {
  667. callback(null, lastFetchTime);
  668. })
  669. },
  670. // 计算最后获取消息的时间之后到现在有多少条消息
  671. function (lastFetchTime, callback) {
  672. if (!lastFetchTime) lastFetchTime = 0;
  673. let now = new Date().getTime();
  674. redis.zcountAsync(messagesByTimestampKey, lastFetchTime, now)
  675. .then(function (count) {
  676. if (handler) {
  677. handler(null, count);
  678. } else {
  679. ModelUtil.emitOK(self.eventEmitter, {count: count});
  680. }
  681. })
  682. }
  683. ], function (err, res) {
  684. if (err) {
  685. if (handler) {
  686. handler(err, 0);
  687. } else {
  688. ModelUtil.emitError(self.eventEmitter, "Get session unread message count failed.")
  689. }
  690. }
  691. });
  692. }
  693. /**
  694. * 获取会话未读消息数。根据成员最后一次获取消息的时候与当前时间。
  695. */
  696. getSessionUnreadMessages(sessionId, userId) {
  697. let self = this;
  698. let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  699. let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  700. async.waterfall([
  701. // 此成员最后获取消息的时间
  702. function (callback) {
  703. redis.zscoreAsync(participantsKey, userId)
  704. .then(function (lastFetchTime) {
  705. callback(null, lastFetchTime);
  706. })
  707. },
  708. // 最后获取消息的时间之后到现在的消息ID列表
  709. function (lastFetchTime, callback) {
  710. if (!lastFetchTime) lastFetchTime = 0;
  711. let now = new Date().getTime();
  712. redis.zrangebyscoreAsync(messagesByTimestampKey, lastFetchTime, now)
  713. .then(function (messageIds) {
  714. callback(null, messageIds);
  715. })
  716. },
  717. // 获取消息
  718. function (messageIds, callback) {
  719. if (messageIds.length == 0) {
  720. ModelUtil.emitOK(self.eventEmitter, []);
  721. return;
  722. }
  723. let startMsgId = messageIds[0];
  724. let endMsgId = messageIds[messageIds.length - 1];
  725. self.getMessagesByPage(sessionId, userId, startMsgId, endMsgId, 0, messageIds.length, 0, function (err, res) {
  726. if (err) {
  727. ModelUtil.emitError(self.eventEmitter, err.message);
  728. return;
  729. }
  730. ModelUtil.emitOK(self.eventEmitter, res);
  731. });
  732. }
  733. ], function (err, res) {
  734. if (err) {
  735. ModelUtil.emitError(self.eventEmitter, "Get session unread message count failed.")
  736. }
  737. });
  738. }
  739. /**
  740. * 保存消息。
  741. *
  742. * 也可以根据议题保存消息,但最终还是保存到与会话对象。
  743. *
  744. * see also: saveMessageByTopic
  745. *
  746. * @param message
  747. * @param sessionId
  748. */
  749. saveMessageBySession(sessionId, message) {
  750. let self = this;
  751. let messages = new Messages();
  752. let participants = new Participants();
  753. let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
  754. let messageId = mongoose.Types.ObjectId().toString();
  755. message.id = messageId;
  756. // 检查会话中是否存在此成员
  757. participants.existsParticipant(sessionId, message.sender_id, function (err, res) {
  758. if (err) {
  759. ModelUtil.emitError(self.eventEmitter, "Check session participant failed: ", err);
  760. return;
  761. }
  762. if (res) {
  763. redis.hmgetAsync(sessionKey, ["type", "name"]).then(function (res) {
  764. let sessionType = res[0];
  765. let sessionName = res[1];
  766. if (sessionType == null) {
  767. ModelUtil.emitError(self.eventEmitter, "Session " + sessionId + " is not found.");
  768. return;
  769. }
  770. // 消息保存到Redis,并更新会话最后状态、用户最后消息获取时间
  771. messages.saveMessageToRedis(sessionId, sessionType, messageId, message);
  772. Messages.updateLastContent(sessionKey, sessionType, sessionName, message);
  773. Sessions.updateParticipantLastFetchTime(sessionId, message.sender_id, message.timestamp.getTime());
  774. // 更新MYSQL中会话的最新状态,并保存消息
  775. SessionRepo.updateSessionLastStatus(message.sender_id, message.sender_name, message.timestamp, message.content, message.content_type, sessionId);
  776. messages.saveMessageToMysql(sessionId, sessionType, messageId, message, function (err, res) {
  777. if (err) {
  778. ModelUtil.emitError(self.eventEmitter, {message: "Failed to save message to mysql: " + err});
  779. } else {
  780. message.timestamp = message.timestamp.getTime();
  781. ModelUtil.emitOK(self.eventEmitter, {count: 1, messages: [message]});
  782. }
  783. });
  784. }).then(function (res) {
  785. // 推送消息
  786. ParticipantRepo.findIds(sessionId, function (err, res) {
  787. if (err) {
  788. ModelUtil.logError("Push message from session: get participant's id list failed: ", err);
  789. } else {
  790. message.session_id = sessionId;
  791. res.forEach(function (participant) {
  792. if (participant.id !== message.sender_id) {
  793. Sessions.pushNotification(participant.id, message);
  794. }
  795. });
  796. }
  797. })
  798. }).catch(function (err) {
  799. ModelUtil.emitError(self.eventEmitter, {message: "Error occurred while save message to session: " + err});
  800. })
  801. } else {
  802. ModelUtil.emitDataNotFound(self.eventEmitter, {message: "当前会话找不到此发送者"});
  803. }
  804. });
  805. }
  806. sendTopicMessages(topicId, message) {
  807. let self = this;
  808. TopicRepo.findAllByTopicId(topicId, function (err, res) {
  809. if (err || res.length == 0) {
  810. ModelUtil.emitOK(self.eventEmitter, {status: -1, "message": "议题获取失败"});
  811. return;
  812. }
  813. self.saveMessageByTopic(message, res[0].session_id, function (err, messageId) {
  814. if (err) {
  815. ModelUtil.emitOK(self.eventEmitter, {status: -1, "message": err});
  816. } else {
  817. message.id = messageId;
  818. ModelUtil.emitOK(self.eventEmitter, {status: 200, "message": "发送成功", data: message});
  819. }
  820. });
  821. });
  822. }
  823. /**
  824. * 保存消息
  825. *
  826. * @param message
  827. * @param sessionId
  828. * @param handler
  829. */
  830. saveMessageByTopic(message, sessionId, handler) {
  831. let messages = new Messages();
  832. let participants = new Participants();
  833. let session_key = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
  834. let messageId = mongoose.Types.ObjectId().toString();
  835. let self = this;
  836. let sessionType = 0;
  837. let sessionName = "";
  838. message.id = messageId;
  839. // 发送成员必须处于会话中
  840. participants.existsParticipant(sessionId, message.sender_id, function (err, res) {
  841. if (res) {
  842. redis.hmgetAsync(session_key, ["type", "name"]).then(function (res) {
  843. sessionType = res[0];
  844. sessionName = res[1];
  845. if (!sessionType || !sessionName) {
  846. logger.error("Unknown session key " + session_key);
  847. if (handler) return handler(new Error("Unknown session key " + session_key));
  848. }
  849. }).then(function (res) {
  850. // 消息数据双写,并更新用户最后消息获取时间,会话新状态等
  851. messages.saveMessageToRedis(sessionId, sessionType, messageId, message);
  852. messages.saveMessageToMysql(sessionId, sessionType, messageId, message);
  853. // 更新会话最新状态及成员最后一次消息获取时间
  854. Sessions.updateParticipantLastFetchTime(sessionId, message.sender_id, message.timestamp.getTime());
  855. Messages.updateLastContent(session_key, sessionType, sessionName, message);
  856. SessionRepo.updateSessionLastStatus(message.sender_id, message.sender_name, message.timestamp, message.content, message.content_type, sessionId);
  857. if (handler) handler(null, messageId);
  858. }).then(function (res) {
  859. // 推送消息
  860. ParticipantRepo.findIds(sessionId, function (err, res) {
  861. if (err) {
  862. if (handler) handler(err, messageId)
  863. } else {
  864. message.session_id = sessionId;
  865. res.forEach(function (participant) {
  866. if (participant.id !== message.sender_id) {
  867. Sessions.pushNotification(participant.id, message);
  868. }
  869. });
  870. }
  871. })
  872. }).catch(function (err) {
  873. if (handler) handler(err, messageId)
  874. })
  875. } else {
  876. if (handler) handler("用户不在此会话当中!", messageId);
  877. }
  878. })
  879. }
  880. /**
  881. * 置顶操作
  882. */
  883. stickSession(sessionId, user) {
  884. let user_session_key = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, user);
  885. let self = this;
  886. //取出最大的session
  887. redis.zrevrangeAsync(user_session_key, 0, 0).then(function (res) {
  888. //获取该session的时间搓
  889. redis.zscoreAsync(user_session_key, res).then(function (scoreres) {
  890. let nowtime = new Date().getTime();
  891. //当前时间搓比redis的时间搓更早证明没有置顶过
  892. if (scoreres <= nowtime) {
  893. //初始化置顶
  894. redis.zaddAsync(user_session_key, STICKY_SESSION_BASE_SCORE, sessionId).then(function (res) {
  895. logger.info("stickSession:" + sessionId + ",res:" + res);
  896. ModelUtil.emitOK(self.eventEmitter, {});
  897. }).then(function () {
  898. SessionRepo.saveStickySession(sessionId, user, STICKY_SESSION_BASE_SCORE);
  899. })
  900. } else {
  901. //已有置顶的数据,取出来加1保存回去
  902. scoreres = Number(scoreres) + 1;
  903. redis.zaddAsync(user_session_key, scoreres, sessionId).then(function () {
  904. logger.info("stickSession:" + sessionId + ",res:" + res);
  905. ModelUtil.emitOK(self.eventEmitter, {});
  906. }).then(function () {
  907. SessionRepo.saveStickySession(sessionId, user, scoreres);
  908. })
  909. }
  910. })
  911. })
  912. }
  913. /**
  914. * 取消会话置顶
  915. */
  916. cancelStickSession(sessionId, user) {
  917. let user_session_key = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, user);
  918. let participants_key = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  919. let self = this;
  920. redis.zscoreAsync(participants_key, user).then(function (res) {
  921. if (!res) {
  922. res = new Date().getTime();
  923. }
  924. redis.zaddAsync(user_session_key, res, sessionId).then(function (res) {
  925. logger.info("cancelStickSession:" + sessionId);
  926. ModelUtil.emitOK(self.eventEmitter, {});
  927. }).then(function () {
  928. SessionRepo.unstickSession(sessionId, user);
  929. });
  930. })
  931. }
  932. /**
  933. * 更新会话参与者的最后消息获取时间。
  934. *
  935. * @param sessionId
  936. * @param userId
  937. */
  938. static updateParticipantLastFetchTime(sessionId, userId, score) {
  939. score = score + 1;
  940. let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  941. redis.zaddAsync(participantsKey, score, userId)
  942. .then(function (res) {
  943. ParticipantRepo.updateLastFetchTime(new Date(score), sessionId, userId, function (err, res) {
  944. if (err) {
  945. logger.error("Update participant last fetch time failed: ", err);
  946. }
  947. });
  948. })
  949. .catch(function (err) {
  950. logger.error("Update participant last fetch time failed: ", err);
  951. });
  952. }
  953. /**
  954. * 向用户推送通知,微信端用户直接推送消息,APP端通过个推发送通知消息。
  955. *
  956. * @param targetUserId
  957. * @param message
  958. */
  959. static pushNotification(targetUserId, message) {
  960. Users.isPatientId(targetUserId, function (err, isPatient) {
  961. if (isPatient) {
  962. WechatClient.sendMessage(targetUserId, message);
  963. }
  964. else {
  965. AppClient.sendNotification(targetUserId, message);
  966. }
  967. });
  968. }
  969. }
  970. // Expose class
  971. module.exports = Sessions;