sessions.js 40 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003
  1. /**
  2. * 会话模型。
  3. */
  4. "use strict";
  5. let RedisClient = require('../../repository/redis/redis.client.js');
  6. let RedisModel = require('./../redis.model.js');
  7. let ModelUtil = require('../../util/model.util');
  8. let Messages = require('../messages/messages');
  9. let Users = require('../user/users');
  10. let Participants = require('./participants');
  11. let SessionRepo = require('../../repository/mysql/session.repo');
  12. let TopicRepo = require('../../repository/mysql/topics.repo');
  13. let ParticipantRepo = require('../../repository/mysql/participant.repo');
  14. let WechatClient = require("../client/wechat.client.js");
  15. let AppClient = require("../client/app.client.js");
  16. let configFile = require('../../include/commons').CONFIG_FILE;
  17. let config = require('../../resources/config/' + configFile);
  18. let redis = RedisClient.redisClient().connection;
  19. let logger = require('../../util/log.js');
  20. let mongoose = require('mongoose');
  21. let async = require("async");
  22. const REDIS_KEYS = require('../../include/commons').REDIS_KEYS;
  23. const SESSION_TYPES = require('../../include/commons').SESSION_TYPES;
  24. const STICKY_SESSION_BASE_SCORE = require('../../include/commons').STICKY_SESSION_BASE_SCORE;
  25. class Sessions extends RedisModel {
  26. constructor() {
  27. super();
  28. }
  29. /**
  30. * 创建会话。会话的ID来源:
  31. * MUC:患者的ID
  32. * P2P:对成员的ID排序后,取hash值
  33. * GROUP:团队的ID
  34. *
  35. * @param sessionId
  36. * @param name 会话名称
  37. * @param type 会话类型
  38. * @param participantArray 会话成员
  39. * @param handler 回调,仅MUC模式使用
  40. */
  41. createSession(sessionId, name, type, participantArray, handler) {
  42. let self = this;
  43. let messageId = mongoose.Types.ObjectId().toString();
  44. //创建session到mysql
  45. self.createSessionToMysql(sessionId, name, type, participantArray, messageId, function (err, res) {
  46. if (err) {
  47. logger.error(err);
  48. } else {
  49. logger.info("create session to mysql success :" + JSON.stringify(res));
  50. }
  51. });
  52. //创建session到redis
  53. self.createSessionToRedis(sessionId, name, type, participantArray, messageId, function (err, res) {
  54. if (err) {
  55. if (handler) {
  56. handler(err, null);
  57. return;
  58. }
  59. ;
  60. ModelUtil.emitError(self.eventEmitter, {message: err, status: -1}, null);
  61. } else {
  62. if (handler) {
  63. handler(null, res);
  64. return;
  65. }
  66. ;
  67. ModelUtil.emitOK(self.eventEmitter, {status: 200, data: res});
  68. }
  69. });
  70. }
  71. /**
  72. * 创建会话。REDIS
  73. * @param sessionId
  74. * @param name
  75. * @param type
  76. * @param participantArray
  77. * @param messageId
  78. * @param handler
  79. * @returns {boolean}
  80. */
  81. createSessionToRedis(sessionId, name, type, participantArray, messageId, handler) {
  82. let self = this;
  83. let messages = new Messages();
  84. let participantIdArray = [];
  85. for (let i in participantArray) {
  86. participantIdArray.push(participantArray[i].split(":")[0]);
  87. }
  88. if (type == SESSION_TYPES.P2P || type == SESSION_TYPES.SYSTEM) {
  89. if (sessionId) {
  90. callBusinessType(sessionId);
  91. return;
  92. }
  93. if (participantIdArray.length != 2) {
  94. handler("P2P session only allow 2 participants.", null);
  95. return false;
  96. }
  97. ParticipantRepo.findSessionIdByParticipantIds(participantIdArray[0], participantIdArray[1], function (err, res) {
  98. sessionId = res;
  99. callBusinessType(sessionId);
  100. });
  101. } else {
  102. if (!sessionId) {
  103. handler("MUC OR GROUP session sessionId is not allow null .", null);
  104. return;
  105. }
  106. callBusinessType(sessionId);
  107. }
  108. function callBusinessType(sessionId) {
  109. ParticipantRepo.getBusinessType(participantIdArray.join("','"), function (err, businessType) {
  110. callCreate(sessionId, businessType);
  111. });
  112. }
  113. function callCreate(sessionId, businessType) {
  114. let createDate = new Date();
  115. let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
  116. let message = {
  117. sender_id: "system",
  118. sender_name: "system",
  119. content_type: 11,
  120. content: "会话创建成功",
  121. timestamp: createDate,
  122. id: messageId
  123. };
  124. let session = {
  125. id: sessionId,
  126. name: name,
  127. type: type,
  128. create_date: createDate.getTime(),
  129. business_type: businessType,
  130. last_sender_id: message.sender_id,
  131. last_sender_name: message.sender_name,
  132. last_message_time: message.timestamp.getTime(),
  133. last_content: message.content,
  134. last_content_type: message.content_type
  135. }
  136. redis.hmsetAsync(sessionKey, session).then(function () {
  137. Participants.saveParticipantsToRedis(sessionId, participantArray, createDate, function (res) {
  138. handler(null, session);
  139. //messages.saveMessageToRedisFromCreateSession(sessionId, messageId, message);
  140. });
  141. })
  142. }
  143. }
  144. /**
  145. * 创建会话。mysql
  146. * @param sessionId
  147. * @param name
  148. * @param type
  149. * @param participantArray
  150. * @param messageId
  151. * @param handler
  152. */
  153. createSessionToMysql(sessionId, name, type, participantArray, messageId, handler) {
  154. let self = this;
  155. //如果sessionId不存在则执行创建sessionId过程
  156. let participantIdArray = [];
  157. for (let i in participantArray) {
  158. participantIdArray.push(participantArray[i].split(":")[0]);
  159. }
  160. //流程1-判断是否存在sessionId不存在则创建对应的sessionId;
  161. if (!sessionId) {
  162. if (type == SESSION_TYPES.P2P || type == SESSION_TYPES.SYSTEM) {
  163. ParticipantRepo.findSessionIdByParticipantIds(participantIdArray[0], participantIdArray[1], function (err, res) {
  164. sessionId = res;
  165. callBusinessType();
  166. });
  167. } else {
  168. return handler("MUC模式和团队模式,不允许sessionId为空!", null);
  169. }
  170. } else {
  171. callBusinessType();
  172. }
  173. //流程2-判断session的业务类型;
  174. function callBusinessType() {
  175. ParticipantRepo.getBusinessType(participantIdArray.join("','"), function (err, businessType) {
  176. if (err) {
  177. handler(err, null);
  178. return;
  179. }
  180. callCreateSession(businessType);
  181. });
  182. }
  183. //流程3-发起session创建 返回session实例
  184. function callCreateSession(businessType) {
  185. //查找该sessionId是否存在存在则直接返回实例
  186. SessionRepo.findOne(sessionId, function (err, res) {
  187. if (res.length > 0) {//已经存在
  188. handler(null, res[0]);
  189. } else {
  190. let createDate = new Date();
  191. let session = {
  192. id: sessionId,
  193. name: name,
  194. type: type,
  195. create_date: createDate.getTime(),
  196. business_type: businessType
  197. };
  198. //将session写入数据库
  199. self.saveSessionToMysql(sessionId, name, type, createDate, businessType, function (err, res) {
  200. if (err) {
  201. handler(err, null);
  202. return;
  203. }
  204. ;
  205. callCreateParticipants(session);
  206. })
  207. }
  208. });
  209. }
  210. //流程4-发起session成员创建
  211. function callCreateParticipants(session) {
  212. Participants.saveParticipantsToMysql(sessionId, participantArray, function (err, res) {
  213. if (err) {
  214. handler(err, null);
  215. return;
  216. }
  217. ;
  218. callBeginTrans(session);
  219. })
  220. }
  221. //流程5-发起session会话
  222. function callBeginTrans(session) {
  223. let mesDate = new Date();
  224. let message = {
  225. sender_id: "system",
  226. sender_name: "system",
  227. content_type: 6,
  228. content: "会话创建成功",
  229. timestamp: mesDate,
  230. id: messageId
  231. };
  232. session.last_sender_id = message.sender_id;
  233. session.last_sender_name = message.sender_name;
  234. session.last_message_time = mesDate.getTime();
  235. session.last_content = message.content;
  236. session.last_content_type = message.content_type;
  237. SessionRepo.updateSessionLastStatus(message.sender_id,
  238. message.sender_name,
  239. message.timestamp,
  240. message.content,
  241. message.content_type,
  242. sessionId, function (err, res) {
  243. if (err) {
  244. handler(err, null);
  245. return;
  246. }
  247. ;
  248. handler(null, session);
  249. });
  250. }
  251. }
  252. /**
  253. * 最近会话列表,7天内。
  254. *
  255. * @param userId
  256. * @param dateSpan
  257. */
  258. getRecentSessions(userId, dateSpan) {
  259. let self = this;
  260. SessionRepo.findAllByTimestampAndType(userId, dateSpan, function (err, res) {
  261. if (err) {
  262. ModelUtil.emitError(self.eventEmitter, "Get recent sessions failed", err);
  263. return;
  264. }
  265. let sessions = [];
  266. res.forEach(function (session) {
  267. sessions.push({
  268. id: session.id,
  269. name: session.name,
  270. type: session.type,
  271. business_type: session.business_type,
  272. create_date: session.create_date
  273. })
  274. });
  275. ModelUtil.emitOK(self.eventEmitter, sessions);
  276. });
  277. }
  278. /**
  279. * 保存session到MySQL
  280. * @param sessionId
  281. * @param name
  282. * @param type
  283. * @param createDate
  284. * @param businessType
  285. * @param handler
  286. */
  287. saveSessionToMysql(sessionId, name, type, createDate, businessType, handler) {
  288. SessionRepo.saveSession(sessionId, name, type, createDate, businessType, handler);
  289. }
  290. /**
  291. * 获取某个用户的全部session列表
  292. * @param userId
  293. * @param handler
  294. */
  295. getUserSessionsFromMysql(userId, handler) {
  296. SessionRepo.findAll(userId, handler);
  297. }
  298. /**
  299. * 获取session单个对象
  300. * @param sessionId
  301. * @param handler
  302. */
  303. getSessions(sessionId, handler) {
  304. SessionRepo.findOne(sessionId, handler);
  305. }
  306. /**
  307. * 根据用户ID获取用户的session列表
  308. * @param userId
  309. * @param page
  310. * @param size
  311. */
  312. getUserSessions(userId, page, size, businessType) {
  313. let userSessionKey = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, userId);
  314. let self = this;
  315. if (page > 0) {
  316. if (page == 1) {
  317. page = 0;
  318. }
  319. page = page + page * size;
  320. }
  321. async.waterfall([
  322. // 获取会话ID列表
  323. function (callback) {
  324. redis.zrevrangeAsync(userSessionKey, page, size)
  325. .then(function (sessionIds) {
  326. if (sessionIds.length == 0) {
  327. ModelUtil.emitOK(self.eventEmitter, []);
  328. return;
  329. }
  330. callback(null, sessionIds);
  331. })
  332. },
  333. // 遍历会话
  334. function (sessionIds, callback) {
  335. let sessionList = [];
  336. sessionIds.forEach(function (sessionId) {
  337. let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
  338. let participantsRoleKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipantsRole, sessionId);
  339. let sessionParticipantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  340. let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  341. redis.multi()
  342. .hgetall(sessionKey) // 会话实体
  343. .hget(participantsRoleKey, userId) // 用户在此会话中的角色
  344. .zscore(sessionParticipantsKey, userId) // 用户在此会话中最后一次获取未读消息的时间
  345. .zrange(participantsKey, 0, -1)
  346. .execAsync()
  347. .then(function (res) {
  348. let session = res[0];
  349. let role = res[1];
  350. let lastFetchTime = res[2];
  351. let users = res[3];
  352. let sessionName = "";
  353. let otheruserId = "";
  354. if (session.type == SESSION_TYPES.P2P) {
  355. for (let j in users) {
  356. if (users[j] != userId) {
  357. otheruserId = users[j];
  358. }
  359. }
  360. }
  361. if (!role) role = 0;
  362. if (!lastFetchTime) lastFetchTime = new Date().getTime();
  363. // 计算未读消息数
  364. let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  365. redis.zcountAsync(messagesByTimestampKey, lastFetchTime, new Date().getTime())
  366. .then(function (count) {
  367. if (!otheruserId) otheruserId = userId;
  368. ParticipantRepo.findNameById(otheruserId, function (err, res) {
  369. if ((res && res.length == 0) || session.type != SESSION_TYPES.P2P) {
  370. sessionName = session.name;
  371. } else {
  372. sessionName = res[0].name;
  373. }
  374. sessionList.push({
  375. id: sessionId,
  376. name: sessionName,
  377. create_date: session.create_date,
  378. last_content_type: session.last_content_type,
  379. last_content: session.last_content,
  380. sender_id: session.sender_id,
  381. type: session.type,
  382. sender_name: session.sender_name,
  383. unread_count: count,
  384. business_type: session.business_type,
  385. my_role: role
  386. });
  387. if (sessionId === sessionIds[sessionIds.length - 1]) {
  388. ModelUtil.emitOK(self.eventEmitter, sessionList);
  389. }
  390. })
  391. })
  392. }).catch(function (err) {
  393. logger.error("Get sessions failed: ", ex);
  394. ModelUtil.emitError(self.eventEmitter, "Get sessions failed: " + err);
  395. });
  396. });
  397. }
  398. ]);
  399. }
  400. /**
  401. * 获取会话消息。全部,不管已读/未读状态。
  402. *
  403. * @param sessionId 会话ID
  404. * @param userId 拉取消息的人
  405. * @param page 第几页
  406. * @param pagesize 分页数量
  407. * @param start_msg_id 消息会话最新的一条消息的ID
  408. * @param end_msg_id 消息会话刚开始的消息ID
  409. */
  410. getMessages(sessionId, user, start_msg_id, end_msg_id, page, pagesize, isoffset, handler) {
  411. let self = this;
  412. let message_timestamp_key = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  413. if (!start_msg_id && !end_msg_id) {
  414. redis.zrevrangeAsync(message_timestamp_key, 0, 0).then(function (res) {
  415. if (res.length == 0) {
  416. if (handler) {
  417. handler(null, res);
  418. return;
  419. }
  420. ModelUtil.emitOK(self.eventEmitter, res);
  421. return;
  422. }
  423. start_msg_id = res[0];
  424. redis.zrangeAsync(message_timestamp_key, 0, 0).then(function (res) {
  425. if (res.length == 0) {
  426. if (handler) {
  427. handler(null, res);
  428. return;
  429. }
  430. ModelUtil.emitOK(self.eventEmitter, res);
  431. return;
  432. }
  433. end_msg_id = res[0];
  434. self.getMessagesByPage(sessionId, user, end_msg_id, start_msg_id, page, pagesize, isoffset, function (err, res) {
  435. if (err) {
  436. if (handler) {
  437. handler(err, null);
  438. return;
  439. }
  440. logger.error("getMessagesByPage error" + err);
  441. ModelUtil.emitError(self.eventEmitter, err, err);
  442. } else {
  443. if (handler) {
  444. handler(null, res);
  445. return;
  446. }
  447. ModelUtil.emitOK(self.eventEmitter, res);
  448. }
  449. })
  450. })
  451. })
  452. } else if (!start_msg_id) {
  453. redis.zrevrangeAsync(message_timestamp_key, 0, 0).then(function (res) {
  454. if (res.length == 0) {
  455. if (handler) {
  456. handler(null, res);
  457. return;
  458. }
  459. ModelUtil.emitOK(self.eventEmitter, res);
  460. return;
  461. }
  462. start_msg_id = res[0];
  463. self.getMessagesByPage(sessionId, user, end_msg_id, start_msg_id, page, pagesize, isoffset, function (err, res) {
  464. if (err) {
  465. if (handler) {
  466. handler(err, null);
  467. return;
  468. }
  469. logger.error("getMessagesByPage error" + err);
  470. ModelUtil.emitError(self.eventEmitter, err, err);
  471. } else {
  472. if (handler) {
  473. handler(null, res);
  474. return;
  475. }
  476. ModelUtil.emitOK(self.eventEmitter, res);
  477. }
  478. })
  479. })
  480. } else if (!end_msg_id) {
  481. redis.zrangeAsync(message_timestamp_key, 0, 0).then(function (res) {
  482. if (res.length == 0) {
  483. ModelUtil.emitOK(self.eventEmitter, res);
  484. return;
  485. }
  486. end_msg_id = res[0];
  487. self.getMessagesByPage(sessionId, user, start_msg_id, end_msg_id, page, pagesize, isoffset, function (err, res) {
  488. if (err) {
  489. if (handler) {
  490. handler(err, null);
  491. return;
  492. }
  493. logger.error("getMessagesByPage error" + err);
  494. ModelUtil.emitError(self.eventEmitter, err, err);
  495. } else {
  496. if (handler) {
  497. handler(null, res);
  498. return;
  499. }
  500. ModelUtil.emitOK(self.eventEmitter, res);
  501. }
  502. })
  503. })
  504. } else {
  505. self.getMessagesByPage(sessionId, user, end_msg_id, start_msg_id, page, pagesize, isoffset, function (err, res) {
  506. if (err) {
  507. if (handler) {
  508. handler(err, null);
  509. return;
  510. }
  511. logger.error("getMessagesByPage error" + err);
  512. ModelUtil.emitError(self.eventEmitter, err, err);
  513. } else {
  514. if (handler) {
  515. handler(null, res);
  516. return;
  517. }
  518. ModelUtil.emitOK(self.eventEmitter, res);
  519. }
  520. })
  521. }
  522. }
  523. /**
  524. * 分页获取会话消息。
  525. *
  526. * @param sessionId 必选。会话ID
  527. * @param userId 必选。用户ID
  528. * @param startMsgId 必选。会话的的起始消息ID,作为检索的起始依据
  529. * @param endMsgId 必选。会话中的结束消息ID
  530. * @param page 必选。页码
  531. * @param size 必选。页面大小
  532. * @param handler 必选。回调
  533. */
  534. getMessagesByPage(sessionId, userId, startMsgId, endMsgId, page, size, isoffset, handler) {
  535. let messagesTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  536. let messagesKey = RedisModel.makeRedisKey(REDIS_KEYS.Messages, sessionId);
  537. let participants = new Participants();
  538. let offset = (page - 1 < 0 ? 0 : page - 1) * size;
  539. let count = size;
  540. if (page > 1 || isoffset == 1) {
  541. offset += 1; // 翻页由于闭区间,需跳过本身数据
  542. }
  543. participants.existsParticipant(sessionId, userId, function (err, res) {
  544. if (!res) {
  545. handler(Error("User not found in session " + sessionId), null);
  546. } else {
  547. //将消息ID转换成分值
  548. redis.multi()
  549. .zscore(messagesTimestampKey, startMsgId)
  550. .zscore(messagesTimestampKey, endMsgId)
  551. .execAsync()
  552. .then(function (res) {
  553. let startMsgScore = res[1];
  554. let endMsgScore = res[0];
  555. if (startMsgScore == null || endMsgScore == null || startMsgScore == endMsgScore) {
  556. handler(null, []);
  557. return;
  558. }
  559. // 从消息时间表中过滤出要获取的消息ID列表,倒序取出消息
  560. redis.zrevrangebyscoreAsync(messagesTimestampKey, startMsgScore, endMsgScore, "limit", offset, count)
  561. .then(function (res) {
  562. if (res.length == 0) {
  563. handler(null, []);
  564. return;
  565. }
  566. redis.hmgetAsync(messagesKey, res).then(function (messages) {
  567. handler(null, messages);
  568. }).then(function () {
  569. Sessions.updateParticipantLastFetchTime(sessionId, userId, new Date().getTime());
  570. })
  571. })
  572. .catch(function (err) {
  573. logger.error("Get message by page failed: ", err);
  574. handler(err, false);
  575. })
  576. })
  577. }
  578. })
  579. }
  580. /**
  581. * 获取所有会话的未读消息数。
  582. */
  583. getAllSessionsUnreadMessageCount(userId) {
  584. let self = this;
  585. let count = 0;
  586. SessionRepo.findAll(userId, function (err, res) {
  587. if (err) {
  588. ModelUtil.logError("getAllSessionsUnreadMessageCount is failed", err);
  589. return;
  590. }
  591. if (res.length == 0) {
  592. ModelUtil.emitOK(self.eventEmitter, {count: count});
  593. return;
  594. }
  595. for (let j in res) {
  596. if (res[j].type == SESSION_TYPES.SYSTEM) {
  597. if (j == res.length - 1) {
  598. ModelUtil.emitOK(self.eventEmitter, {count: count});
  599. }
  600. continue;
  601. }
  602. callback(res, j, res[j]);
  603. }
  604. });
  605. function callback(res, j, session) {
  606. self.getSessionUnreadMessageCount(res[j].id, userId, function (err, con) {
  607. if (err) {
  608. ModelUtil.logError("getAllSessionsUnreadMessageCount is failed", err);
  609. }
  610. count = count + con;
  611. if (j == res.length - 1) {
  612. ModelUtil.emitOK(self.eventEmitter, {count: count});
  613. }
  614. })
  615. }
  616. }
  617. /**
  618. * 获取会话的未读消息数。根据成员最后一次获取消息的时候与当前时间。
  619. *
  620. * @param sessionId
  621. * @param userId
  622. */
  623. getSessionUnreadMessageCount(sessionId, userId, handler) {
  624. let self = this;
  625. let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  626. let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  627. async.waterfall([
  628. // 此成员最后获取消息的时间
  629. function (callback) {
  630. redis.zscoreAsync(participantsKey, userId)
  631. .then(function (lastFetchTime) {
  632. callback(null, lastFetchTime);
  633. })
  634. },
  635. // 计算最后获取消息的时间之后到现在有多少条消息
  636. function (lastFetchTime, callback) {
  637. if (!lastFetchTime) lastFetchTime = 0;
  638. let now = new Date().getTime();
  639. redis.zcountAsync(messagesByTimestampKey, lastFetchTime, now)
  640. .then(function (count) {
  641. if (handler) {
  642. handler(null, count);
  643. } else {
  644. ModelUtil.emitOK(self.eventEmitter, {count: count});
  645. }
  646. })
  647. }
  648. ], function (err, res) {
  649. if (err) {
  650. if (handler) {
  651. handler(err, 0);
  652. } else {
  653. ModelUtil.emitError(self.eventEmitter, "Get session unread message count failed.")
  654. }
  655. }
  656. });
  657. }
  658. /**
  659. * 获取会话未读消息数。根据成员最后一次获取消息的时候与当前时间。
  660. */
  661. getSessionUnreadMessages(sessionId, userId) {
  662. let self = this;
  663. let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  664. let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  665. async.waterfall([
  666. // 此成员最后获取消息的时间
  667. function (callback) {
  668. redis.zscoreAsync(participantsKey, userId)
  669. .then(function (lastFetchTime) {
  670. callback(null, lastFetchTime);
  671. })
  672. },
  673. // 最后获取消息的时间之后到现在的消息ID列表
  674. function (lastFetchTime, callback) {
  675. if (!lastFetchTime) lastFetchTime = 0;
  676. let now = new Date().getTime();
  677. redis.zrangebyscoreAsync(messagesByTimestampKey, lastFetchTime, now)
  678. .then(function (messageIds) {
  679. callback(null, messageIds);
  680. })
  681. },
  682. // 获取消息
  683. function (messageIds, callback) {
  684. if (messageIds.length == 0) {
  685. ModelUtil.emitOK(self.eventEmitter, []);
  686. return;
  687. }
  688. let startMsgId = messageIds[0];
  689. let endMsgId = messageIds[messageIds.length - 1];
  690. self.getMessagesByPage(sessionId, userId, startMsgId, endMsgId, 0, messageIds.length, 0, function (err, res) {
  691. if (err) {
  692. ModelUtil.emitError(self.eventEmitter, err.message);
  693. return;
  694. }
  695. ModelUtil.emitOK(self.eventEmitter, res);
  696. });
  697. }
  698. ], function (err, res) {
  699. if (err) {
  700. ModelUtil.emitError(self.eventEmitter, "Get session unread message count failed.")
  701. }
  702. });
  703. }
  704. /**
  705. * 保存消息。
  706. *
  707. * 也可以根据议题保存消息,但最终还是保存到与会话对象。
  708. *
  709. * see also: saveMessageByTopic
  710. *
  711. * @param message
  712. * @param sessionId
  713. */
  714. saveMessageBySession(sessionId, message) {
  715. let self = this;
  716. let messages = new Messages();
  717. let participants = new Participants();
  718. let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
  719. let messageId = mongoose.Types.ObjectId().toString();
  720. message.id = messageId;
  721. // 检查会话中是否存在此成员
  722. participants.existsParticipant(sessionId, message.sender_id, function (err, res) {
  723. if (err) {
  724. ModelUtil.emitError(self.eventEmitter, "Check session paticipant failed: ", err);
  725. return;
  726. }
  727. if (res) {
  728. redis.hmgetAsync(sessionKey, ["type", "name"]).then(function (res) {
  729. let sessionType = res[0];
  730. let sessionName = res[1];
  731. if (sessionType == null) {
  732. ModelUtil.emitError(self.eventEmitter, "Session with id " + sessionId + " not found.");
  733. return;
  734. }
  735. //消息保存到REDIS
  736. messages.saveMessageToRedis(sessionId, sessionType, messageId, message);
  737. //更新REDIS最后一条消息
  738. Messages.updateLastContent(sessionKey, sessionType, sessionName, message);
  739. //更新用户最后一次获取消息的时间
  740. Sessions.updateParticipantLastFetchTime(sessionId, message.sender_id, message.timestamp.getTime());
  741. //更新最后一条消息到数据库
  742. SessionRepo.updateSessionLastStatus(message.sender_id, message.sender_name, message.timestamp, message.content, message.content_type, sessionId);
  743. //将消息保存到数据库
  744. messages.saveMessageToMysql(sessionId, sessionType, messageId, message, function (err, res) {
  745. if (err) {
  746. ModelUtil.emitError(self.eventEmitter, {message: "Failed to save message to mysql: " + err});
  747. } else {
  748. message.timestamp = message.timestamp.getTime();
  749. ModelUtil.emitOK(self.eventEmitter, {count: 1, messages: [message]});
  750. }
  751. });
  752. }).then(function (res) {
  753. // 推送消息
  754. ParticipantRepo.findIds(sessionId, function (err, res) {
  755. if (err) {
  756. ModelUtil.logError("Push message from session: get participant's id list failed: ", err);
  757. } else {
  758. message.session_id = sessionId;
  759. res.forEach(function (participant) {
  760. if (participant.id !== message.sender_id) {
  761. Sessions.pushNotification(participant.id, message);
  762. }
  763. });
  764. }
  765. })
  766. }).catch(function (err) {
  767. ModelUtil.emitError(self.eventEmitter, {message: "Error occurred while save message to session: " + err});
  768. })
  769. } else {
  770. ModelUtil.emitDataNotFound(self.eventEmitter, {message: "当前会话找不到此发送者"});
  771. }
  772. });
  773. }
  774. sendTopicMessages(topicId, message) {
  775. let self = this;
  776. TopicRepo.findAllByTopicId(topicId, function (err, res) {
  777. if (err || res.length == 0) {
  778. ModelUtil.emitOK(self.eventEmitter, {status: -1, "message": "议题获取失败"});
  779. return;
  780. }
  781. self.saveMessageByTopic(message, res[0].session_id, function (err, messageId) {
  782. if (err) {
  783. ModelUtil.emitOK(self.eventEmitter, {status: -1, "message": err});
  784. } else {
  785. message.id = messageId;
  786. ModelUtil.emitOK(self.eventEmitter, {status: 200, "message": "发送成功", data: message});
  787. }
  788. });
  789. });
  790. }
  791. /**
  792. * 保存消息
  793. *
  794. * @param message
  795. * @param sessionId
  796. * @param handler
  797. */
  798. saveMessageByTopic(message, sessionId, handler) {
  799. let messages = new Messages();
  800. let participants = new Participants();
  801. let session_key = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
  802. let messageId = mongoose.Types.ObjectId().toString();
  803. let self = this;
  804. let sessionType = 0;
  805. let sessionName = "";
  806. message.id = messageId;
  807. // 发送成员必须处于会话中
  808. participants.existsParticipant(sessionId, message.sender_id, function (err, res) {
  809. if (res) {
  810. redis.hmgetAsync(session_key, ["type", "name"]).then(function (res) {
  811. sessionType = res[0];
  812. sessionName = res[1];
  813. if (!sessionType || !sessionName) {
  814. logger.error("session is error for key " + session_key);
  815. throw "session is not found";
  816. }
  817. }).then(function (res) {
  818. // 更新消息存储REDIS
  819. messages.saveMessageToRedis(sessionId, sessionType, messageId, message);
  820. //更新消息存储到mysql
  821. messages.saveMessageToMysql(sessionId, sessionType, messageId, message);
  822. // 更新会话最新状态及成员最后一次消息获取时间
  823. Sessions.updateParticipantLastFetchTime(sessionId, message.sender_id, message.timestamp.getTime());
  824. //更新最后一条消息
  825. Messages.updateLastContent(session_key, sessionType, sessionName, message);
  826. //更新session实体的最后一条消息
  827. SessionRepo.updateSessionLastStatus(message.sender_id, message.sender_name, message.timestamp, message.content, message.content_type, sessionId);
  828. handler(null, messageId);
  829. }).then(function (res) {
  830. // 推送消息
  831. ParticipantRepo.findIds(sessionId, function (err, res) {
  832. if (err) {
  833. handler(err, messageId)
  834. } else {
  835. message.session_id = sessionId;
  836. res.forEach(function (participant) {
  837. if (participant.id !== message.sender_id) {
  838. Sessions.pushNotification(participant.id, message);
  839. }
  840. });
  841. }
  842. })
  843. }).catch(function (err) {
  844. handler(err, messageId)
  845. })
  846. } else {
  847. handler("用户不在此会话当中!", messageId);
  848. }
  849. })
  850. }
  851. /**
  852. * 置顶操作
  853. */
  854. stickSession(sessionId, user) {
  855. let user_session_key = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, user);
  856. let self = this;
  857. //取出最大的session
  858. redis.zrevrangeAsync(user_session_key, 0, 0).then(function (res) {
  859. //获取该session的时间搓
  860. redis.zscoreAsync(user_session_key, res).then(function (scoreres) {
  861. let nowtime = new Date().getTime();
  862. //当前时间搓比redis的时间搓更早证明没有置顶过
  863. if (scoreres <= nowtime) {
  864. //初始化置顶
  865. redis.zaddAsync(user_session_key, STICKY_SESSION_BASE_SCORE, sessionId).then(function (res) {
  866. logger.info("stickSession:" + sessionId + ",res:" + res);
  867. ModelUtil.emitOK(self.eventEmitter, {});
  868. }).then(function () {
  869. SessionRepo.saveStickySession(sessionId, user, STICKY_SESSION_BASE_SCORE);
  870. })
  871. } else {
  872. //已有置顶的数据,取出来加1保存回去
  873. scoreres = Number(scoreres) + 1;
  874. redis.zaddAsync(user_session_key, scoreres, sessionId).then(function () {
  875. logger.info("stickSession:" + sessionId + ",res:" + res);
  876. ModelUtil.emitOK(self.eventEmitter, {});
  877. }).then(function () {
  878. SessionRepo.saveStickySession(sessionId, user, scoreres);
  879. })
  880. }
  881. })
  882. })
  883. }
  884. /**
  885. * 取消会话置顶
  886. */
  887. cancelStickSession(sessionId, user) {
  888. let user_session_key = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, user);
  889. let participants_key = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  890. let self = this;
  891. redis.zscoreAsync(participants_key, user).then(function (res) {
  892. if (!res) {
  893. res = new Date().getTime();
  894. }
  895. redis.zaddAsync(user_session_key, res, sessionId).then(function (res) {
  896. logger.info("cancelStickSession:" + sessionId);
  897. ModelUtil.emitOK(self.eventEmitter, {});
  898. }).then(function () {
  899. SessionRepo.unstickSession(sessionId, user);
  900. });
  901. })
  902. }
  903. /**
  904. * 更新会话参与者的最后消息获取时间。
  905. *
  906. * @param sessionId
  907. * @param userId
  908. */
  909. static updateParticipantLastFetchTime(sessionId, userId, score) {
  910. score = score + 1;
  911. let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  912. redis.zaddAsync(participantsKey, score, userId)
  913. .then(function (res) {
  914. ParticipantRepo.updateLastFetchTime(new Date(score), sessionId, userId, function (err, res) {
  915. if (err) {
  916. logger.error("Update participant last fetch time failed: ", err);
  917. }
  918. });
  919. })
  920. .catch(function (err) {
  921. logger.error("Update participant last fetch time failed: ", err);
  922. });
  923. }
  924. /**
  925. * 向用户推送通知,微信端用户直接推送消息,APP端通过个推发送通知消息。
  926. *
  927. * @param targetUserId
  928. * @param message
  929. */
  930. static pushNotification(targetUserId, message) {
  931. Users.isPatientId(targetUserId, function (err, isPatient) {
  932. if (isPatient) {
  933. WechatClient.sendMessage(targetUserId, message);
  934. }
  935. else {
  936. AppClient.sendNotification(targetUserId, message);
  937. }
  938. });
  939. }
  940. }
  941. // Expose class
  942. module.exports = Sessions;