sessions.js 43 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036
  1. /**
  2. * 会话模型。
  3. */
  4. "use strict";
  5. let RedisClient = require('../../repository/redis/redis.client.js');
  6. let RedisModel = require('./../redis.model.js');
  7. let ModelUtil = require('../../util/model.util');
  8. let Messages = require('../messages/messages');
  9. let Users = require('../user/users');
  10. let Participants = require('./participants');
  11. let SessionRepo = require('../../repository/mysql/session.repo');
  12. let TopicRepo = require('../../repository/mysql/topics.repo');
  13. let ParticipantRepo = require('../../repository/mysql/participant.repo');
  14. let WechatClient = require("../client/wechat.client.js");
  15. let AppClient = require("../client/app.client.js");
  16. let configFile = require('../../include/commons').CONFIG_FILE;
  17. let config = require('../../resources/config/' + configFile);
  18. let redis = RedisClient.redisClient().connection;
  19. let logger = require('../../util/log.js');
  20. let mongoose = require('mongoose');
  21. let async = require("async");
  22. let log = require("../../util/log.js");
  23. const REDIS_KEYS = require('../../include/commons').REDIS_KEYS;
  24. const SESSION_TYPES = require('../../include/commons').SESSION_TYPES;
  25. const STICKY_SESSION_BASE_SCORE = require('../../include/commons').STICKY_SESSION_BASE_SCORE;
  26. const PARTICIPANT_ROLES = require('../../include/commons').PARTICIPANT_ROLES;
  27. class Sessions extends RedisModel {
  28. constructor() {
  29. super();
  30. }
  31. /**
  32. * 创建会话。会话的ID来源:
  33. * MUC:患者的ID
  34. * P2P:对成员的ID排序后,取hash值
  35. * GROUP:团队的ID
  36. *
  37. * @param sessionId
  38. * @param name 会话名称
  39. * @param type 会话类型
  40. * @param participantArray 会话成员
  41. * @param handler 回调,仅MUC模式使用
  42. */
  43. createSession(sessionId, name, type, participantArray, handler) {
  44. let self = this;
  45. let messageId = mongoose.Types.ObjectId().toString();
  46. //创建session到mysql
  47. self.createSessionToMysql(sessionId, name, type, participantArray, messageId, function (err, res) {
  48. if (err) {
  49. if (handler) {
  50. handler(err, null);
  51. return;
  52. }
  53. ModelUtil.emitError(self.eventEmitter, {message: err, status: -1}, null);
  54. } else {
  55. //创建session到redis
  56. self.createSessionToRedis(sessionId, name, type, participantArray, messageId, function (err, res) {
  57. if (err) {
  58. if (handler) {
  59. handler(err, null);
  60. return;
  61. }
  62. ModelUtil.emitError(self.eventEmitter, {message: err, status: -1}, null);
  63. } else {
  64. if (handler) {
  65. handler(null, res);
  66. return;
  67. }
  68. ModelUtil.emitOK(self.eventEmitter, {status: 200, data: res});
  69. }
  70. });
  71. }
  72. });
  73. }
  74. /**
  75. * 创建会话。REDIS
  76. * @param sessionId
  77. * @param name
  78. * @param type
  79. * @param participantArray
  80. * @param messageId
  81. * @param handler
  82. * @returns {boolean}
  83. */
  84. createSessionToRedis(sessionId, name, type, participantArray, messageId, handler) {
  85. let self = this;
  86. let messages = new Messages();
  87. let participantIdArray = [];
  88. for (let i in participantArray) {
  89. participantIdArray.push(participantArray[i].split(":")[0]);
  90. }
  91. if (type == SESSION_TYPES.P2P || type == SESSION_TYPES.SYSTEM) {
  92. if (sessionId) {
  93. callBusinessType(sessionId);
  94. return;
  95. }
  96. if (participantIdArray.length != 2) {
  97. handler("P2P session only allow 2 participants.", null);
  98. return false;
  99. }
  100. ParticipantRepo.findSessionIdByParticipantIds(participantIdArray[0], participantIdArray[1], function (err, res) {
  101. sessionId = res;
  102. callBusinessType(sessionId);
  103. });
  104. } else {
  105. if (!sessionId) {
  106. handler("MUC OR GROUP session sessionId is not allow null .", null);
  107. return;
  108. }
  109. callBusinessType(sessionId);
  110. }
  111. function callBusinessType(sessionId) {
  112. ParticipantRepo.getBusinessType(participantIdArray, function (err, businessType) {
  113. callCreate(sessionId, businessType);
  114. });
  115. }
  116. function callCreate(sessionId, businessType) {
  117. let createDate = new Date();
  118. let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
  119. let message = {
  120. sender_id: "system",
  121. sender_name: "system",
  122. content_type: 11,
  123. content: "会话创建成功",
  124. timestamp: createDate,
  125. id: messageId
  126. };
  127. if (type == SESSION_TYPES.MUC) {
  128. businessType = 2;
  129. }
  130. let session = {
  131. id: sessionId,
  132. name: name,
  133. type: type,
  134. create_date: createDate.getTime(),
  135. business_type: businessType,
  136. last_sender_id: message.sender_id,
  137. last_sender_name: message.sender_name,
  138. last_message_time: message.timestamp.getTime(),
  139. last_content: message.content,
  140. last_content_type: message.content_type
  141. };
  142. redis.hmsetAsync(sessionKey, session).then(function () {
  143. Participants.saveParticipantsToRedis(sessionId, participantArray, createDate, function (res) {
  144. handler(null, session);
  145. //messages.saveMessageToRedisFromCreateSession(sessionId, messageId, message);
  146. });
  147. })
  148. }
  149. }
  150. /**
  151. * 创建会话。mysql
  152. * @param sessionId
  153. * @param name
  154. * @param type
  155. * @param participantArray
  156. * @param messageId
  157. * @param handler
  158. */
  159. createSessionToMysql(sessionId, name, type, participantArray, messageId, handler) {
  160. let self = this;
  161. //如果sessionId不存在则执行创建sessionId过程
  162. let participantIdArray = [];
  163. for (let i in participantArray) {
  164. participantIdArray.push(participantArray[i].split(":")[0]);
  165. }
  166. //流程1-判断是否存在sessionId不存在则创建对应的sessionId;
  167. if (!sessionId) {
  168. if (type == SESSION_TYPES.P2P || type == SESSION_TYPES.SYSTEM) {
  169. ParticipantRepo.findSessionIdByParticipantIds(participantIdArray[0], participantIdArray[1], function (err, res) {
  170. sessionId = res;
  171. callBusinessType();
  172. });
  173. } else {
  174. return handler("MUC模式和团队模式,不允许sessionId为空!", null);
  175. }
  176. } else {
  177. callBusinessType();
  178. }
  179. //流程2-判断session的业务类型;
  180. function callBusinessType() {
  181. ParticipantRepo.getBusinessType(participantIdArray, function (err, businessType) {
  182. if (err) {
  183. handler(err, null);
  184. return;
  185. }
  186. callCreateSession(businessType);
  187. });
  188. }
  189. //流程3-发起session创建 返回session实例
  190. function callCreateSession(businessType) {
  191. //查找该sessionId是否存在存在则直接返回实例
  192. SessionRepo.findOne(sessionId, function (err, res) {
  193. if (res.length > 0) {//已经存在
  194. //更新成员
  195. Participants.saveParticipantsToMysql(sessionId, participantArray, function (err, update) {
  196. handler(null, res[0]);
  197. return;
  198. })
  199. } else {
  200. let createDate = new Date();
  201. let session = {
  202. id: sessionId,
  203. name: name,
  204. type: type,
  205. create_date: createDate.getTime(),
  206. business_type: businessType
  207. };
  208. //将session写入数据库
  209. self.saveSessionToMysql(sessionId, name, type, createDate, businessType, function (err, res) {
  210. if (err) {
  211. handler(err, null);
  212. return;
  213. }
  214. callCreateParticipants(session);
  215. })
  216. }
  217. });
  218. }
  219. //流程4-发起session成员创建
  220. function callCreateParticipants(session) {
  221. Participants.saveParticipantsToMysql(sessionId, participantArray, function (err, res) {
  222. if (err) {
  223. handler(err, null);
  224. return;
  225. } else {
  226. handler(null, session);
  227. return;
  228. }
  229. })
  230. }
  231. }
  232. /**
  233. * 最近会话列表,7天内。
  234. *
  235. * @param userId
  236. * @param dateSpan
  237. */
  238. getRecentSessions(userId, dateSpan) {
  239. let self = this;
  240. SessionRepo.findAllByTimestampAndType(userId, dateSpan, function (err, res) {
  241. if (err) {
  242. ModelUtil.emitError(self.eventEmitter, "Get recent sessions failed", err);
  243. return;
  244. }
  245. let sessions = [];
  246. res.forEach(function (session) {
  247. sessions.push({
  248. id: session.id,
  249. name: session.name,
  250. type: session.type,
  251. business_type: session.business_type,
  252. create_date: session.create_date
  253. })
  254. });
  255. ModelUtil.emitOK(self.eventEmitter, sessions);
  256. });
  257. }
  258. /**
  259. * 保存session到MySQL
  260. * @param sessionId
  261. * @param name
  262. * @param type
  263. * @param createDate
  264. * @param businessType
  265. * @param handler
  266. */
  267. saveSessionToMysql(sessionId, name, type, createDate, businessType, handler) {
  268. SessionRepo.saveSession(sessionId, name, type, createDate, businessType, handler);
  269. }
  270. /**
  271. * 获取某个用户的全部session列表
  272. * @param userId
  273. * @param handler
  274. */
  275. getUserSessionsFromMysql(userId, handler) {
  276. SessionRepo.findAll(userId, handler);
  277. }
  278. /**
  279. * 获取session单个对象
  280. * @param sessionId
  281. * @param handler
  282. */
  283. getSessions(sessionId, handler) {
  284. SessionRepo.findOne(sessionId, handler);
  285. }
  286. /**
  287. * 根据用户ID获取用户的session列表
  288. * @param userId
  289. * @param page
  290. * @param size
  291. * @param businessType
  292. */
  293. getUserSessions(userId, page, size, businessType) {
  294. let userSessionKey = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, userId);
  295. let self = this;
  296. if (page > 0) {
  297. if (page == 1) {
  298. page = 0;
  299. }
  300. page = page + page * size;
  301. }
  302. async.waterfall([
  303. // 获取会话ID列表
  304. function (callback) {
  305. redis.zrevrangeAsync(userSessionKey, page, size)
  306. .then(function (sessionIds) {
  307. if (sessionIds.length == 0) {
  308. ModelUtil.emitOK(self.eventEmitter, []);
  309. return;
  310. }
  311. callback(null, sessionIds);
  312. })
  313. },
  314. // 遍历会话
  315. function (sessionIds) {
  316. let sessionList = [];
  317. let functionList = [];
  318. for (let j = 0; j < sessionIds.length; j++) {
  319. let fun = function (index, callback) {
  320. if (!callback) {
  321. callback = index, index = 0
  322. }
  323. let sessionId = sessionIds[index];
  324. let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
  325. let participantsRoleKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipantsRole, sessionId);
  326. let sessionParticipantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  327. let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  328. redis.multi()
  329. .hgetall(sessionKey) // 会话实体
  330. .hget(participantsRoleKey, userId) // 用户在此会话中的角色
  331. .zscore(sessionParticipantsKey, userId) // 用户在此会话中最后一次获取未读消息的时间
  332. .zrange(participantsKey, 0, -1)
  333. .execAsync()
  334. .then(function (res) {
  335. let session = res[0];
  336. let role = res[1];
  337. let lastFetchTime = res[2];
  338. let users = res[3];
  339. let sessionName = "";
  340. let otherUserId = "";
  341. if (session.type == SESSION_TYPES.P2P) {
  342. for (let j in users) {
  343. if (users[j] != userId) {
  344. otherUserId = users[j];
  345. }
  346. }
  347. }
  348. if (!role) role = 0;
  349. if (!lastFetchTime) lastFetchTime = new Date().getTime();
  350. // 计算未读消息数
  351. let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  352. redis.zcountAsync(messagesByTimestampKey, lastFetchTime, new Date().getTime())
  353. .then(function (count) {
  354. if (!otherUserId) otherUserId = userId;
  355. ParticipantRepo.findNameById(otherUserId, function (err, res) {
  356. if ((res && res.length == 0) || session.type != SESSION_TYPES.P2P) {
  357. sessionName = session.name;
  358. } else {
  359. sessionName = res[0].name;
  360. }
  361. var bir = new Date().getTime();
  362. if (res.length != 0 && res[0].birthdate) {
  363. bir = res[0].birthdate.getTime();
  364. }
  365. var sex = 1;
  366. if (res.length != 0 && res[0].sex) {
  367. sex = res[0].sex;
  368. }
  369. sessionList.push({
  370. id: sessionId,
  371. name: sessionName,
  372. create_date: session.create_date,
  373. last_content_type: session.last_content_type,
  374. last_content: session.last_content,
  375. sender_id: session.sender_id,
  376. type: session.type,
  377. sender_name: session.sender_name,
  378. unread_count: count,
  379. business_type: session.business_type,
  380. my_role: role,
  381. sender_sex: sex,
  382. sender_birthday: bir,
  383. });
  384. index = (parseInt(index) + 1);
  385. if (index == sessionIds.length) {
  386. ModelUtil.emitOK(self.eventEmitter, sessionList);
  387. } else {
  388. callback(null, index);
  389. }
  390. })
  391. })
  392. })
  393. .catch(function (err) {
  394. logger.error("Get sessions failed: ", err);
  395. });
  396. };
  397. functionList.push(fun);
  398. }
  399. async.waterfall(functionList);
  400. }
  401. ]);
  402. }
  403. /**
  404. * 获取会话消息。全部,不管已读/未读状态。
  405. *
  406. * @param sessionId 会话ID
  407. * @param userId 拉取消息的人
  408. * @param page 第几页
  409. * @param pagesize 分页数量
  410. * @param start_msg_id 消息会话最新的一条消息的ID
  411. * @param end_msg_id 消息会话刚开始的消息ID
  412. */
  413. getMessages(sessionId, user, start_msg_id, end_msg_id, page, pagesize, isoffset, handler) {
  414. let self = this;
  415. let message_timestamp_key = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  416. if (!start_msg_id && !end_msg_id) {
  417. redis.zrevrangeAsync(message_timestamp_key, 0, 0).then(function (res) {
  418. if (res.length == 0) {
  419. if (handler) {
  420. handler(null, res);
  421. return;
  422. }
  423. ModelUtil.emitOK(self.eventEmitter, res);
  424. return;
  425. }
  426. start_msg_id = res[0];
  427. redis.zrangeAsync(message_timestamp_key, 0, 0).then(function (res) {
  428. if (res.length == 0) {
  429. if (handler) {
  430. handler(null, res);
  431. return;
  432. }
  433. ModelUtil.emitOK(self.eventEmitter, res);
  434. return;
  435. }
  436. end_msg_id = res[0];
  437. self.getMessagesByPage(sessionId, user, end_msg_id, start_msg_id, page, pagesize, isoffset, function (err, res) {
  438. if (err) {
  439. if (handler) {
  440. handler(err, null);
  441. return;
  442. }
  443. logger.error("getMessagesByPage error" + err);
  444. ModelUtil.emitError(self.eventEmitter, err, err);
  445. } else {
  446. if (handler) {
  447. handler(null, res);
  448. return;
  449. }
  450. ModelUtil.emitOK(self.eventEmitter, res);
  451. }
  452. })
  453. })
  454. })
  455. } else if (!start_msg_id) {
  456. redis.zrevrangeAsync(message_timestamp_key, 0, 0).then(function (res) {
  457. if (res.length == 0) {
  458. if (handler) {
  459. handler(null, res);
  460. return;
  461. }
  462. ModelUtil.emitOK(self.eventEmitter, res);
  463. return;
  464. }
  465. start_msg_id = res[0];
  466. self.getMessagesByPage(sessionId, user, start_msg_id,end_msg_id , page, pagesize, isoffset, function (err, res) {
  467. if (err) {
  468. if (handler) {
  469. handler(err, null);
  470. return;
  471. }
  472. logger.error("getMessagesByPage error" + err);
  473. ModelUtil.emitError(self.eventEmitter, err, err);
  474. } else {
  475. if (handler) {
  476. handler(null, res);
  477. return;
  478. }
  479. ModelUtil.emitOK(self.eventEmitter, res);
  480. }
  481. })
  482. })
  483. } else if (!end_msg_id) {
  484. redis.zrangeAsync(message_timestamp_key, 0, 0).then(function (res) {
  485. if (res.length == 0) {
  486. ModelUtil.emitOK(self.eventEmitter, res);
  487. return;
  488. }
  489. end_msg_id = res[0];
  490. self.getMessagesByPage(sessionId, user, end_msg_id,start_msg_id, page, pagesize, isoffset, function (err, res) {
  491. if (err) {
  492. if (handler) {
  493. handler(err, null);
  494. return;
  495. }
  496. logger.error("getMessagesByPage error" + err);
  497. ModelUtil.emitError(self.eventEmitter, err, err);
  498. } else {
  499. if (handler) {
  500. handler(null, res);
  501. return;
  502. }
  503. ModelUtil.emitOK(self.eventEmitter, res);
  504. }
  505. })
  506. })
  507. } else {
  508. self.getMessagesByPage(sessionId, user, end_msg_id, start_msg_id, page, pagesize, isoffset, function (err, res) {
  509. if (err) {
  510. if (handler) {
  511. handler(err, null);
  512. return;
  513. }
  514. logger.error("getMessagesByPage error" + err);
  515. ModelUtil.emitError(self.eventEmitter, err, err);
  516. } else {
  517. if (handler) {
  518. handler(null, res);
  519. return;
  520. }
  521. ModelUtil.emitOK(self.eventEmitter, res);
  522. }
  523. })
  524. }
  525. }
  526. /**
  527. * 分页获取会话消息。
  528. *
  529. * @param sessionId 必选。会话ID
  530. * @param userId 必选。用户ID
  531. * @param startMsgId 必选。会话的的起始消息ID,作为检索的起始依据
  532. * @param endMsgId 必选。会话中的结束消息ID
  533. * @param page 必选。页码
  534. * @param size 必选。页面大小
  535. * @param handler 必选。回调
  536. */
  537. getMessagesByPage(sessionId, userId, startMsgId, endMsgId, page, size, isoffset, handler) {
  538. let messagesTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  539. let messagesKey = RedisModel.makeRedisKey(REDIS_KEYS.Messages, sessionId);
  540. let participants = new Participants();
  541. let offset = (page - 1 < 0 ? 0 : page - 1) * size;
  542. let count = size;
  543. if (page > 1 || isoffset == 1) {
  544. offset += 1; // 翻页由于闭区间,需跳过本身数据
  545. }
  546. participants.existsParticipant(sessionId, userId, function (err, res) {
  547. if (!res) {
  548. handler(Error("User not found in session " + sessionId), null);
  549. } else {
  550. //将消息ID转换成分值
  551. redis.multi()
  552. .zscore(messagesTimestampKey, startMsgId)
  553. .zscore(messagesTimestampKey, endMsgId)
  554. .execAsync()
  555. .then(function (res) {
  556. let startMsgScore = res[1];
  557. let endMsgScore = res[0];
  558. if (startMsgScore == null || endMsgScore == null || (startMsgScore == endMsgScore && isoffset == 1)) {
  559. handler(null, []);
  560. return;
  561. }
  562. if(endMsgScore>startMsgScore){
  563. console.error("endMsgScore>startMsgScore");
  564. redis.zrangebyscoreAsync(messagesTimestampKey, startMsgScore, endMsgScore, "limit", offset, count)
  565. .then(function (res) {
  566. if (res.length == 0) {
  567. handler(null, []);
  568. return;
  569. }
  570. redis.hmgetAsync(messagesKey, res).then(function (messages) {
  571. handler(null, messages);
  572. }).then(function () {
  573. Sessions.updateParticipantLastFetchTime(sessionId, userId, new Date().getTime());
  574. })
  575. })
  576. .catch(function (err) {
  577. logger.error("Get message by page failed: ", err);
  578. handler(err, false);
  579. })
  580. }else{
  581. console.error(3);
  582. // 从消息时间表中过滤出要获取的消息ID列表,倒序取出消息
  583. redis.zrevrangebyscoreAsync(messagesTimestampKey, startMsgScore, endMsgScore, "limit", offset, count)
  584. .then(function (res) {
  585. if (res.length == 0) {
  586. handler(null, []);
  587. return;
  588. }
  589. redis.hmgetAsync(messagesKey, res).then(function (messages) {
  590. handler(null, messages);
  591. }).then(function () {
  592. Sessions.updateParticipantLastFetchTime(sessionId, userId, new Date().getTime());
  593. })
  594. })
  595. .catch(function (err) {
  596. logger.error("Get message by page failed: ", err);
  597. handler(err, false);
  598. })
  599. }
  600. })
  601. }
  602. })
  603. }
  604. /**
  605. * 获取所有会话的未读消息数。
  606. */
  607. getAllSessionsUnreadMessageCount(userId) {
  608. let self = this;
  609. let count = 0;
  610. let patientCount = 0;
  611. let doctorCount = 0;
  612. SessionRepo.findAll(userId, function (err, res) {
  613. if (err) {
  614. ModelUtil.logError("getAllSessionsUnreadMessageCount is failed", err);
  615. return;
  616. }
  617. if (res.length == 0) {
  618. ModelUtil.emitOK(self.eventEmitter, {count: count});
  619. return;
  620. }
  621. for (let j in res) {
  622. if (res[j].type == SESSION_TYPES.SYSTEM) {
  623. if (j == res.length - 1) {
  624. ModelUtil.emitOK(self.eventEmitter, {count: count});
  625. }
  626. continue;
  627. }
  628. callback(res, j, res[j]);
  629. }
  630. });
  631. function callback(res, j, session) {
  632. self.getSessionUnreadMessageCount(res[j].id, userId, function (err, con) {
  633. if (err) {
  634. ModelUtil.logError("getAllSessionsUnreadMessageCount is failed", err);
  635. }
  636. count = count + con;
  637. if (session.type == 2) {
  638. patientCount = patientCount + con;
  639. } else {
  640. doctorCount = doctorCount + con;
  641. }
  642. if (j == res.length - 1) {
  643. ModelUtil.emitOK(self.eventEmitter, {count: count, patient: patientCount, doctor: doctorCount});
  644. }
  645. })
  646. }
  647. }
  648. /**
  649. * 获取会话的未读消息数。根据成员最后一次获取消息的时候与当前时间。
  650. *
  651. * @param sessionId
  652. * @param userId
  653. */
  654. getSessionUnreadMessageCount(sessionId, userId, handler) {
  655. let self = this;
  656. let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  657. let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  658. async.waterfall([
  659. // 此成员最后获取消息的时间
  660. function (callback) {
  661. redis.zscoreAsync(participantsKey, userId)
  662. .then(function (lastFetchTime) {
  663. callback(null, lastFetchTime);
  664. })
  665. },
  666. // 计算最后获取消息的时间之后到现在有多少条消息
  667. function (lastFetchTime, callback) {
  668. if (!lastFetchTime) lastFetchTime = 0;
  669. let now = new Date().getTime();
  670. redis.zcountAsync(messagesByTimestampKey, lastFetchTime, now)
  671. .then(function (count) {
  672. if (handler) {
  673. handler(null, count);
  674. } else {
  675. ModelUtil.emitOK(self.eventEmitter, {count: count});
  676. }
  677. })
  678. }
  679. ], function (err, res) {
  680. if (err) {
  681. if (handler) {
  682. handler(err, 0);
  683. } else {
  684. ModelUtil.emitError(self.eventEmitter, "Get session unread message count failed.")
  685. }
  686. }
  687. });
  688. }
  689. /**
  690. * 获取会话未读消息数。根据成员最后一次获取消息的时候与当前时间。
  691. */
  692. getSessionUnreadMessages(sessionId, userId) {
  693. let self = this;
  694. let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  695. let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  696. async.waterfall([
  697. // 此成员最后获取消息的时间
  698. function (callback) {
  699. redis.zscoreAsync(participantsKey, userId)
  700. .then(function (lastFetchTime) {
  701. callback(null, lastFetchTime);
  702. })
  703. },
  704. // 最后获取消息的时间之后到现在的消息ID列表
  705. function (lastFetchTime, callback) {
  706. if (!lastFetchTime) lastFetchTime = 0;
  707. let now = new Date().getTime();
  708. redis.zrangebyscoreAsync(messagesByTimestampKey, lastFetchTime, now)
  709. .then(function (messageIds) {
  710. callback(null, messageIds);
  711. })
  712. },
  713. // 获取消息
  714. function (messageIds, callback) {
  715. if (messageIds.length == 0) {
  716. ModelUtil.emitOK(self.eventEmitter, []);
  717. return;
  718. }
  719. let startMsgId = messageIds[0];
  720. let endMsgId = messageIds[messageIds.length - 1];
  721. self.getMessagesByPage(sessionId, userId, startMsgId, endMsgId, 0, messageIds.length, 0, function (err, res) {
  722. if (err) {
  723. ModelUtil.emitError(self.eventEmitter, err.message);
  724. return;
  725. }
  726. ModelUtil.emitOK(self.eventEmitter, res);
  727. });
  728. }
  729. ], function (err, res) {
  730. if (err) {
  731. ModelUtil.emitError(self.eventEmitter, "Get session unread message count failed.")
  732. }
  733. });
  734. }
  735. /**
  736. * 保存消息。
  737. *
  738. * 也可以根据议题保存消息,但最终还是保存到与会话对象。
  739. *
  740. * see also: saveMessageByTopic
  741. *
  742. * @param message
  743. * @param sessionId
  744. */
  745. saveMessageBySession(sessionId, message) {
  746. let self = this;
  747. let messages = new Messages();
  748. let participants = new Participants();
  749. let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
  750. let messageId = mongoose.Types.ObjectId().toString();
  751. message.id = messageId;
  752. // 检查会话中是否存在此成员
  753. participants.existsParticipant(sessionId, message.sender_id, function (err, res) {
  754. if (err) {
  755. ModelUtil.emitError(self.eventEmitter, "Check session participant failed: ", err);
  756. return;
  757. }
  758. if (res) {
  759. redis.hmgetAsync(sessionKey, ["type", "name"]).then(function (res) {
  760. let sessionType = res[0];
  761. let sessionName = res[1];
  762. if (sessionType == null) {
  763. ModelUtil.emitError(self.eventEmitter, "Session " + sessionId + " is not found.");
  764. return;
  765. }
  766. // 消息保存到Redis,并更新会话最后状态、用户最后消息获取时间
  767. messages.saveMessageToRedis(sessionId, sessionType, messageId, message);
  768. Messages.updateLastContent(sessionKey, sessionType, sessionName, message);
  769. Sessions.updateParticipantLastFetchTime(sessionId, message.sender_id, message.timestamp.getTime());
  770. // 更新MYSQL中会话的最新状态,并保存消息
  771. SessionRepo.updateSessionLastStatus(message.sender_id, message.sender_name, message.timestamp, message.content, message.content_type, sessionId);
  772. messages.saveMessageToMysql(sessionId, sessionType, messageId, message, function (err, res) {
  773. if (err) {
  774. ModelUtil.emitError(self.eventEmitter, {message: "Failed to save message to mysql: " + err});
  775. } else {
  776. message.timestamp = message.timestamp.getTime();
  777. ModelUtil.emitOK(self.eventEmitter, {count: 1, messages: [message]});
  778. }
  779. });
  780. }).then(function (res) {
  781. // 推送消息
  782. ParticipantRepo.findIds(sessionId, function (err, res) {
  783. if (err) {
  784. ModelUtil.logError("Push message from session: get participant's id list failed: ", err);
  785. } else {
  786. message.session_id = sessionId;
  787. res.forEach(function (participant) {
  788. if (participant.id !== message.sender_id &&
  789. participant.participant_role == PARTICIPANT_ROLES.HOST) {
  790. Sessions.pushNotification(participant.id, message);
  791. }
  792. });
  793. }
  794. })
  795. }).catch(function (err) {
  796. ModelUtil.emitError(self.eventEmitter, {message: "Error occurred while save message to session: " + err});
  797. })
  798. } else {
  799. ModelUtil.emitDataNotFound(self.eventEmitter, {message: "当前会话找不到此发送者"});
  800. }
  801. });
  802. }
  803. sendTopicMessages(topicId, message) {
  804. let self = this;
  805. TopicRepo.findAllByTopicId(topicId, function (err, res) {
  806. if (err || res.length == 0) {
  807. ModelUtil.emitOK(self.eventEmitter, {status: -1, "message": "议题获取失败"});
  808. return;
  809. }
  810. self.saveMessageByTopic(message, res[0].session_id, function (err, messageId) {
  811. if (err) {
  812. ModelUtil.emitOK(self.eventEmitter, {status: -1, "message": err});
  813. } else {
  814. message.id = messageId;
  815. ModelUtil.emitOK(self.eventEmitter, {status: 200, "message": "发送成功", data: message});
  816. }
  817. });
  818. });
  819. }
  820. /**
  821. * 保存消息
  822. *
  823. * @param message
  824. * @param sessionId
  825. * @param handler
  826. */
  827. saveMessageByTopic(message, sessionId, handler) {
  828. let messages = new Messages();
  829. let participants = new Participants();
  830. let session_key = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
  831. let messageId = mongoose.Types.ObjectId().toString();
  832. let self = this;
  833. let sessionType = 0;
  834. let sessionName = "";
  835. message.id = messageId;
  836. // 发送成员必须处于会话中
  837. participants.existsParticipant(sessionId, message.sender_id, function (err, res) {
  838. if (res) {
  839. redis.hmgetAsync(session_key, ["type", "name"]).then(function (res) {
  840. sessionType = res[0];
  841. sessionName = res[1];
  842. if (!sessionType || !sessionName) {
  843. logger.error("Unknown session key " + session_key);
  844. if (handler) return handler(new Error("Unknown session key " + session_key));
  845. }
  846. }).then(function (res) {
  847. // 消息数据双写,并更新用户最后消息获取时间,会话新状态等
  848. messages.saveMessageToRedis(sessionId, sessionType, messageId, message);
  849. messages.saveMessageToMysql(sessionId, sessionType, messageId, message);
  850. // 更新会话最新状态及成员最后一次消息获取时间
  851. Sessions.updateParticipantLastFetchTime(sessionId, message.sender_id, message.timestamp.getTime());
  852. Messages.updateLastContent(session_key, sessionType, sessionName, message);
  853. SessionRepo.updateSessionLastStatus(message.sender_id, message.sender_name, message.timestamp, message.content, message.content_type, sessionId);
  854. if (handler) handler(null, messageId);
  855. }).then(function (res) {
  856. // 推送消息
  857. ParticipantRepo.findIds(sessionId, function (err, res) {
  858. if (err) {
  859. if (handler) handler(err, messageId)
  860. } else {
  861. message.session_id = sessionId;
  862. res.forEach(function (participant) {
  863. if (participant.id !== message.sender_id) {
  864. Sessions.pushNotification(participant.id, message);
  865. }
  866. });
  867. }
  868. })
  869. }).catch(function (err) {
  870. if (handler) handler(err, messageId)
  871. })
  872. } else {
  873. if (handler) handler("用户不在此会话当中!", messageId);
  874. }
  875. })
  876. }
  877. /**
  878. * 置顶操作
  879. */
  880. stickSession(sessionId, user) {
  881. let user_session_key = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, user);
  882. let self = this;
  883. //取出最大的session
  884. redis.zrevrangeAsync(user_session_key, 0, 0).then(function (res) {
  885. //获取该session的时间搓
  886. redis.zscoreAsync(user_session_key, res).then(function (scoreres) {
  887. let nowtime = new Date().getTime();
  888. //当前时间搓比redis的时间搓更早证明没有置顶过
  889. if (scoreres <= nowtime) {
  890. //初始化置顶
  891. redis.zaddAsync(user_session_key, STICKY_SESSION_BASE_SCORE, sessionId).then(function (res) {
  892. logger.info("stickSession:" + sessionId + ",res:" + res);
  893. ModelUtil.emitOK(self.eventEmitter, {});
  894. }).then(function () {
  895. SessionRepo.saveStickySession(sessionId, user, STICKY_SESSION_BASE_SCORE);
  896. })
  897. } else {
  898. //已有置顶的数据,取出来加1保存回去
  899. scoreres = Number(scoreres) + 1;
  900. redis.zaddAsync(user_session_key, scoreres, sessionId).then(function () {
  901. logger.info("stickSession:" + sessionId + ",res:" + res);
  902. ModelUtil.emitOK(self.eventEmitter, {});
  903. }).then(function () {
  904. SessionRepo.saveStickySession(sessionId, user, scoreres);
  905. })
  906. }
  907. })
  908. })
  909. }
  910. /**
  911. * 取消会话置顶
  912. */
  913. cancelStickSession(sessionId, user) {
  914. let user_session_key = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, user);
  915. let participants_key = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  916. let self = this;
  917. redis.zscoreAsync(participants_key, user).then(function (res) {
  918. if (!res) {
  919. res = new Date().getTime();
  920. }
  921. redis.zaddAsync(user_session_key, res, sessionId).then(function (res) {
  922. logger.info("cancelStickSession:" + sessionId);
  923. ModelUtil.emitOK(self.eventEmitter, {});
  924. }).then(function () {
  925. SessionRepo.unstickSession(sessionId, user);
  926. });
  927. })
  928. }
  929. /**
  930. * 更新会话参与者的最后消息获取时间。
  931. *
  932. * @param sessionId
  933. * @param userId
  934. */
  935. static updateParticipantLastFetchTime(sessionId, userId, score) {
  936. score = score + 1;
  937. let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  938. redis.zaddAsync(participantsKey, score, userId)
  939. .then(function (res) {
  940. ParticipantRepo.updateLastFetchTime(new Date(score), sessionId, userId, function (err, res) {
  941. if (err) {
  942. logger.error("Update participant last fetch time failed: ", err);
  943. }
  944. });
  945. })
  946. .catch(function (err) {
  947. logger.error("Update participant last fetch time failed: ", err);
  948. });
  949. }
  950. /**
  951. * 向用户推送通知,微信端用户直接推送消息,APP端通过个推发送通知消息。
  952. *
  953. * @param targetUserId
  954. * @param message
  955. */
  956. static pushNotification(targetUserId, message) {
  957. Users.isPatientId(targetUserId, function (err, isPatient) {
  958. if (isPatient) {
  959. WechatClient.sendMessage(targetUserId, message);
  960. }
  961. else {
  962. AppClient.sendNotification(targetUserId, message);
  963. }
  964. });
  965. }
  966. }
  967. // Expose class
  968. module.exports = Sessions;