sessions.js 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715
  1. /**
  2. * 会话模型。
  3. */
  4. "use strict";
  5. let RedisClient = require('../../repository/redis/redis.client.js');
  6. let RedisModel = require('./../redis.model.js');
  7. let ModelUtil = require('../../util/model.util');
  8. let Messages = require('../messages/messages');
  9. let Users = require('../user/users');
  10. let Participants = require('./Participants');
  11. let SessionRepo = require('../../repository/mysql/session.repo');
  12. let ParticipantRepo = require('../../repository/mysql/participant.repo');
  13. let WechatClient = require("../client/wechat.client.js");
  14. let AppClient = require("../client/app.client.js");
  15. let configFile = require('../../include/commons').CONFIG_FILE;
  16. let config = require('../../resources/config/' + configFile);
  17. let redis = RedisClient.redisClient().connection;
  18. let logger = require('../../util/log.js');
  19. let mongoose = require('mongoose');
  20. let async = require("async");
  21. const REDIS_KEYS = require('../../include/commons').REDIS_KEYS;
  22. const SESSION_TYPES = require('../../include/commons').SESSION_TYPES;
  23. const STICKY_SESSION_BASE_SCORE = require('../../include/commons').STICKY_SESSION_BASE_SCORE;
  24. const SESSION_BUSINESS_TYPE = require('../../include/commons').SESSION_BUSINESS_TYPE;
  25. class Sessions extends RedisModel {
  26. constructor() {
  27. super();
  28. }
  29. /**
  30. * 创建会话。会话的ID来源:
  31. * MUC:患者的ID
  32. * P2P:对成员的ID排序后,取hash值
  33. * GROUP:团队的ID
  34. *
  35. * @param sessionId
  36. * @param name 会话名称
  37. * @param type 会话类型
  38. * @param participantArray 会话成员
  39. * @param handler 回调,仅MUC模式使用
  40. */
  41. createSession(sessionId, name, type, participantArray, handler) {
  42. let self = this;
  43. if (type == SESSION_TYPES.P2P||type==SESSION_TYPES.SYSTEM) {
  44. var participantIdArray = [];
  45. for (let i in participantArray) {
  46. participantIdArray.push(participantArray[i].split(":")[0]);
  47. }
  48. if (participantIdArray.length != 2) {
  49. ModelUtil.emitDataNotFound(self.eventEmitter, {message: "P2P session only allow 2 participants."});
  50. return false;
  51. }
  52. ParticipantRepo.findSessionIdByParticipantIds(participantIdArray[0], participantIdArray[1], function (err, res) {
  53. sessionId = res;
  54. callBusinessType(sessionId);
  55. });
  56. } else {
  57. callBusinessType(sessionId);
  58. }
  59. var businessType = SESSION_BUSINESS_TYPE.DOCTOR;
  60. function callBusinessType(sessionId) {
  61. for (var j = 0; j < participantArray.length; j++)
  62. callIsPatient(j, participantArray.length);
  63. }
  64. function callIsPatient(j, length) {
  65. Users.isPatientId(participantArray[j], function (isPatient) {
  66. if (isPatient) {
  67. businessType = SESSION_BUSINESS_TYPE.PATIENT
  68. }
  69. if (length - 1 == j) {
  70. callCreate(sessionId, businessType);
  71. }
  72. })
  73. }
  74. function callCreate(sessionId, businessType) {
  75. SessionRepo.findOne(sessionId, function (err, res) {
  76. if (res.length > 0) {
  77. let session = res[0];
  78. ModelUtil.emitOK(self.eventEmitter, {
  79. id: session.id,
  80. name: session.name,
  81. type: session.type,
  82. business_type: session.business_type || businessType,
  83. create_date: session.create_date
  84. });
  85. return;
  86. }
  87. let createDate = new Date();
  88. let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
  89. // 保存会话及成员至MySQL中
  90. self.saveSessionToMysql(sessionId, name, type, createDate, businessType, function (err, res) {
  91. Participants.saveParticipantsToMysql(sessionId, participantArray, function (err, res) {
  92. if (err) {
  93. ModelUtil.emitError(self.eventEmitter, err.message);
  94. return;
  95. }
  96. let message = {
  97. sender_id: "System",
  98. sender_name: "System",
  99. content_type: 1,
  100. content: "",
  101. timestamp: createDate
  102. };
  103. Messages.updateLastContent(sessionKey, type, name, message);
  104. Participants.saveParticipantsToRedis(sessionId, participantArray, createDate, function (res) {
  105. if (handler) {
  106. handler(true, sessionId);
  107. } else {
  108. ModelUtil.emitOK(self.eventEmitter, {id: sessionId});
  109. }
  110. });
  111. });
  112. });
  113. });
  114. }
  115. }
  116. /**
  117. * 保存session到MySQL
  118. * @param sessionId
  119. * @param name
  120. * @param type
  121. * @param createDate
  122. * @param handler
  123. */
  124. saveSessionToMysql(sessionId, name, type, createDate, businessType, handler) {
  125. SessionRepo.saveSession(sessionId, name, type, createDate, businessType, handler);
  126. }
  127. /**
  128. * 获取某个用户的全部session列表
  129. * @param userId
  130. * @param handler
  131. */
  132. getUserSessionsFromMysql(userId, handler) {
  133. SessionRepo.findAll(userId, handler);
  134. }
  135. /**
  136. * 获取session单个对象
  137. * @param sessionId
  138. * @param handler
  139. */
  140. getSessions(sessionId, handler) {
  141. SessionRepo.findOne(sessionId, handler);
  142. }
  143. /**
  144. * 根据用户ID获取用户的session列表
  145. * @param userId
  146. * @param page
  147. * @param size
  148. */
  149. getUserSessions(userId, page, size, businessType) {
  150. let userSessionKey = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, userId);
  151. let self = this;
  152. if (page > 0) {
  153. if (page == 1) {
  154. page = 0;
  155. }
  156. page = page + page * size;
  157. }
  158. async.waterfall([
  159. // 获取会话ID列表
  160. function (callback) {
  161. redis.zrevrangeAsync(userSessionKey, page, size)
  162. .then(function (sessionIds) {
  163. if (sessionIds.length == 0) {
  164. ModelUtil.emitOK(self.eventEmitter, []);
  165. return;
  166. }
  167. callback(null, sessionIds);
  168. })
  169. },
  170. // 遍历会话
  171. function (sessionIds, callback) {
  172. let sessionList = [];
  173. sessionIds.forEach(function (sessionId) {
  174. let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
  175. let participantsRoleKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipantsRole, sessionId);
  176. let sessionParticipantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  177. redis.multi()
  178. .hgetall(sessionKey) // 会话实体
  179. .hget(participantsRoleKey, userId) // 用户在此会话中的角色
  180. .zscore(sessionParticipantsKey, userId) // 用户在此会话中最后一次获取未读消息的时间
  181. .execAsync()
  182. .then(function (res) {
  183. let session = res[0];
  184. let role = res[1];
  185. let lastFetchTime = res[2];
  186. if(!lastFetchTime)lastFetchTime = new Date().getTime();
  187. // 计算未读消息数
  188. let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  189. redis.zcountAsync(messagesByTimestampKey, lastFetchTime, new Date().getTime())
  190. .then(function (count) {
  191. if(businessType&&businessType!= session.business_type){
  192. }else{
  193. sessionList.push({
  194. id: sessionId,
  195. name: session.name,
  196. create_date: session.create_date,
  197. last_content_type: session.last_content_type,
  198. last_content: session.last_content,
  199. sender_id: session.sender_id,
  200. type: session.type,
  201. sender_name: session.sender_name,
  202. unread_count: count,
  203. business_type: session.business_type,
  204. my_role: role
  205. });
  206. }
  207. if (sessionId == sessionIds[sessionIds.length - 1]) {
  208. ModelUtil.emitOK(self.eventEmitter, sessionList);
  209. }
  210. })
  211. })
  212. .catch(function (err) {
  213. ModelUtil.emitError(self.eventEmitter, "Get sessions failed: " + err);
  214. });
  215. });
  216. }
  217. ]);
  218. }
  219. /**
  220. * 获取会话消息。全部,不管已读/未读状态。
  221. *
  222. * @param sessionId 会话ID
  223. * @param userId 拉取消息的人
  224. * @param page 第几页
  225. * @param pagesize 分页数量
  226. * @param start_msg_id 消息会话最新的一条消息的ID
  227. * @param end_msg_id 消息会话刚开始的消息ID
  228. */
  229. getMessages(sessionId, user, start_msg_id, end_msg_id, page, pagesize, isoffset) {
  230. let self = this;
  231. let message_timestamp_key = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  232. if (!start_msg_id && !end_msg_id) {
  233. redis.zrevrangeAsync(message_timestamp_key, 0, 0).then(function (res) {
  234. if (res.length == 0) {
  235. ModelUtil.emitOK(self.eventEmitter, res);
  236. return;
  237. }
  238. start_msg_id = res[0];
  239. redis.zrangeAsync(message_timestamp_key, 0, 0).then(function (res) {
  240. if (res.length == 0) {
  241. ModelUtil.emitOK(self.eventEmitter, res);
  242. return;
  243. }
  244. end_msg_id = res[0];
  245. self.getMessagesByPage(sessionId, user, end_msg_id, start_msg_id, page, pagesize, isoffset, function (err, res) {
  246. if (err) {
  247. logger.error("getMessagesByPage error" + err);
  248. ModelUtil.emitError(self.eventEmitter, err, err);
  249. } else {
  250. ModelUtil.emitOK(self.eventEmitter, res);
  251. }
  252. })
  253. })
  254. })
  255. } else if (!start_msg_id) {
  256. redis.zrevrangeAsync(message_timestamp_key, 0, 0).then(function (res) {
  257. if (res.length == 0) {
  258. ModelUtil.emitOK(self.eventEmitter, res);
  259. return;
  260. }
  261. start_msg_id = res[0];
  262. self.getMessagesByPage(sessionId, user, end_msg_id, start_msg_id, page, pagesize, isoffset, function (err, res) {
  263. if (err) {
  264. logger.error("getMessagesByPage error" + err);
  265. ModelUtil.emitError(self.eventEmitter, err, err);
  266. } else {
  267. ModelUtil.emitOK(self.eventEmitter, res);
  268. }
  269. })
  270. })
  271. } else if (!end_msg_id) {
  272. redis.zrangeAsync(message_timestamp_key, 0, 0).then(function (res) {
  273. if (res.length == 0) {
  274. ModelUtil.emitOK(self.eventEmitter, res);
  275. return;
  276. }
  277. end_msg_id = res[0];
  278. self.getMessagesByPage(sessionId, user, start_msg_id, end_msg_id, page, pagesize, isoffset, function (err, res) {
  279. if (err) {
  280. logger.error("getMessagesByPage error" + err);
  281. ModelUtil.emitError(self.eventEmitter, err, err);
  282. } else {
  283. ModelUtil.emitOK(self.eventEmitter, res);
  284. }
  285. })
  286. })
  287. } else {
  288. self.getMessagesByPage(sessionId, user, end_msg_id, start_msg_id, page, pagesize, isoffset, function (err, res) {
  289. if (err) {
  290. logger.error("getMessagesByPage error" + err);
  291. ModelUtil.emitError(self.eventEmitter, err, err);
  292. } else {
  293. ModelUtil.emitOK(self.eventEmitter, res);
  294. }
  295. })
  296. }
  297. }
  298. /**
  299. * 分页获取会话消息。
  300. *
  301. * @param sessionId 必选。会话ID
  302. * @param userId 必选。用户ID
  303. * @param startMsgId 必选。会话的的起始消息ID,作为检索的起始依据
  304. * @param endMsgId 必选。会话中的结束消息ID
  305. * @param page 必选。页码
  306. * @param size 必选。页面大小
  307. * @param handler 必选。回调
  308. */
  309. getMessagesByPage(sessionId, userId, startMsgId, endMsgId, page, size, isoffset, handler) {
  310. let messagesTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  311. let messagesKey = RedisModel.makeRedisKey(REDIS_KEYS.Messages, sessionId);
  312. let participants = new Participants();
  313. let offset = (page - 1 < 0 ? 0 : page - 1) * size;
  314. let count = size;
  315. if (page > 1 || isoffset == 1) {
  316. offset += 1; // 翻页由于闭区间,需跳过本身数据
  317. }
  318. participants.existsParticipant(sessionId, userId, function (err, res) {
  319. if (!res) {
  320. handler(Error("User not found in session " + sessionId), null);
  321. } else {
  322. //将消息ID转换成分值
  323. redis.multi()
  324. .zscore(messagesTimestampKey, startMsgId)
  325. .zscore(messagesTimestampKey, endMsgId)
  326. .execAsync()
  327. .then(function (res) {
  328. let startMsgScore = res[1];
  329. let endMsgScore = res[0];
  330. if (startMsgScore == null || endMsgScore == null || startMsgScore == endMsgScore) {
  331. handler(null, []);
  332. return;
  333. }
  334. // 从消息时间表中过滤出要获取的消息ID列表,倒序取出消息
  335. redis.zrevrangebyscoreAsync(messagesTimestampKey, startMsgScore, endMsgScore, "limit", offset, count)
  336. .then(function (res) {
  337. if (res.length == 0) {
  338. handler(null, []);
  339. return;
  340. }
  341. redis.hmgetAsync(messagesKey, res).then(function (messages) {
  342. handler(null, messages);
  343. }).then(function () {
  344. Sessions.updateParticipantLastFetchTime(sessionId, userId, new Date().getTime());
  345. })
  346. })
  347. .catch(function (res) {
  348. handler(res, false);
  349. })
  350. })
  351. }
  352. })
  353. }
  354. /**
  355. * 获取所有会话的未读消息数。
  356. */
  357. getAllSessionsUnreadMessageCount(userId) {
  358. let self = this;
  359. ModelUtil.emitError(self.eventEmitter, {message: "not implemented."}, null);
  360. }
  361. /**
  362. * 获取会话的未读消息数。根据成员最后一次获取消息的时候与当前时间。
  363. *
  364. * @param sessionId
  365. * @param userId
  366. */
  367. getSessionUnreadMessageCount(sessionId, userId,handler) {
  368. let self = this;
  369. let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  370. let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  371. async.waterfall([
  372. // 此成员最后获取消息的时间
  373. function (callback) {
  374. redis.zscoreAsync(participantsKey, userId)
  375. .then(function (lastFetchTime) {
  376. callback(null, lastFetchTime);
  377. })
  378. },
  379. // 计算最后获取消息的时间之后到现在有多少条消息
  380. function (lastFetchTime, callback) {
  381. if (!lastFetchTime) lastFetchTime = 0;
  382. let now = new Date().getTime();
  383. redis.zcountAsync(messagesByTimestampKey, lastFetchTime, now)
  384. .then(function (count) {
  385. if(handler){
  386. handler(null,count);
  387. }else{
  388. ModelUtil.emitOK(self.eventEmitter, {count: count});
  389. }
  390. })
  391. }
  392. ], function (err, res) {
  393. if (err) {
  394. if(handler){
  395. handler(err,0);
  396. }else{
  397. ModelUtil.emitError(self.eventEmitter, "Get session unread message count failed.")
  398. }
  399. }
  400. });
  401. }
  402. /**
  403. * 获取会话未读消息数。根据成员最后一次获取消息的时候与当前时间。
  404. */
  405. getSessionUnreadMessages(sessionId, userId) {
  406. let self = this;
  407. let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  408. let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  409. async.waterfall([
  410. // 此成员最后获取消息的时间
  411. function (callback) {
  412. redis.zscoreAsync(participantsKey, userId)
  413. .then(function (lastFetchTime) {
  414. callback(null, lastFetchTime);
  415. })
  416. },
  417. // 最后获取消息的时间之后到现在的消息ID列表
  418. function (lastFetchTime, callback) {
  419. if (!lastFetchTime) lastFetchTime = 0;
  420. let now = new Date().getTime();
  421. redis.zrangebyscoreAsync(messagesByTimestampKey, lastFetchTime, now)
  422. .then(function (messageIds) {
  423. callback(null, messageIds);
  424. })
  425. },
  426. // 获取消息
  427. function (messageIds, callback) {
  428. if (messageIds.length == 0) {
  429. ModelUtil.emitOK(self.eventEmitter, []);
  430. return;
  431. }
  432. let startMsgId = messageIds[0];
  433. let endMsgId = messageIds[messageIds.length - 1];
  434. self.getMessagesByPage(sessionId, userId, startMsgId, endMsgId, 0, messageIds.length, 0, function (err, res) {
  435. if (err) {
  436. ModelUtil.emitError(self.eventEmitter, err.message);
  437. return;
  438. }
  439. ModelUtil.emitOK(self.eventEmitter, res);
  440. });
  441. }
  442. ], function (err, res) {
  443. if (err) {
  444. ModelUtil.emitError(self.eventEmitter, "Get session unread message count failed.")
  445. }
  446. });
  447. }
  448. /**
  449. * 保存消息。
  450. *
  451. * 也可以根据议题保存消息,但最终还是保存到与会话对象。
  452. *
  453. * see also: saveMessageByTopic
  454. *
  455. * @param message
  456. * @param sessionId
  457. */
  458. saveMessageBySession(sessionId, message) {
  459. let self = this;
  460. let messages = new Messages();
  461. let participants = new Participants();
  462. let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
  463. let messageId = mongoose.Types.ObjectId().toString();
  464. message.id = messageId;
  465. // 检查会话中是否存在此成员
  466. participants.existsParticipant(sessionId, message.sender_id, function (err, res) {
  467. if (err) {
  468. ModelUtil.emitError(self.eventEmitter, "Check session paticipant failed: ", err);
  469. return;
  470. }
  471. if (res) {
  472. redis.hmgetAsync(sessionKey, ["type", "name"]).then(function (res) {
  473. let sessionType = res[0];
  474. if (sessionType == null) {
  475. ModelUtil.emitError(self.eventEmitter, "Session with id " + sessionId + " not found.");
  476. return;
  477. }
  478. messages.saveMessageToRedis(sessionId, sessionType, messageId, message);
  479. Sessions.updateParticipantLastFetchTime(sessionId, message.sender_id, message.timestamp.getTime());
  480. messages.saveMessageToMysql(sessionId, sessionType, messageId, message, function (err, res) {
  481. if (err) {
  482. ModelUtil.emitError(self.eventEmitter, {message: "Failed to save message to mysql: " + err});
  483. } else {
  484. message.timestamp = message.timestamp.getTime();
  485. ModelUtil.emitOK(self.eventEmitter, {count: 1, messages: [message]});
  486. }
  487. });
  488. }).then(function (res) {
  489. // 推送消息
  490. ParticipantRepo.findIds(sessionId, function (err, res) {
  491. if (err) {
  492. ModelUtil.logError("Push message from session: get participant's id list failed: ", err);
  493. } else {
  494. message.session_id = sessionId;
  495. res.forEach(function (participant) {
  496. if (participant.id !== message.sender_id) {
  497. Sessions.pushNotification(participant.id, message);
  498. }
  499. });
  500. }
  501. })
  502. }).catch(function (err) {
  503. ModelUtil.emitError(self.eventEmitter, {message: "Error occurred while save message to session: " + err});
  504. })
  505. } else {
  506. ModelUtil.emitDataNotFound(self.eventEmitter, {message: "当前会话找不到此发送者"});
  507. }
  508. });
  509. }
  510. sendTopicMessages(topicId,message){
  511. let self = this;
  512. let topicKey = RedisModel.makeRedisKey(REDIS_KEYS.Topic, topicId);
  513. redis.hgetallAsync(topicKey).then(function (topic) {
  514. self.saveMessageBySession(topic.session_id,message);
  515. });
  516. }
  517. /**
  518. * 保存消息
  519. *
  520. * @param message
  521. * @param sessionId
  522. * @param handler
  523. */
  524. saveMessageByTopic(message, sessionId, handler) {
  525. let messages = new Messages();
  526. let participants = new Participants();
  527. let session_key = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
  528. let messageId = mongoose.Types.ObjectId().toString();
  529. let self = this;
  530. let sessionType = 0;
  531. let sessionName = "";
  532. message.id = messageId;
  533. // 发送成员必须处于会话中
  534. participants.existsParticipant(sessionId, message.sender_id, function (err, res) {
  535. if (res) {
  536. redis.hmgetAsync(session_key, ["type", "name"]).then(function (res) {
  537. sessionType = res[0];
  538. sessionName = res[1];
  539. if (!sessionType || !sessionName) {
  540. logger.error("session is error for key " + session_key);
  541. throw "session is not found";
  542. }
  543. }).then(function (res) {
  544. // 更新消息存储
  545. messages.saveMessageToRedis(sessionId, sessionType, messageId, message);
  546. messages.saveMessageToMysql(sessionId, sessionType, messageId, message);
  547. // 更新会话最新状态及成员最后一次消息获取时间
  548. Sessions.updateParticipantLastFetchTime(sessionId, message.sender_id, message.timestamp.getTime());
  549. Messages.updateLastContent(session_key, sessionType, sessionName, message);
  550. handler(null, messageId);
  551. }).then(function (res) {
  552. // 推送消息
  553. ParticipantRepo.findIds(sessionId, function (err, res) {
  554. if (err) {
  555. ModelUtil.logError("Push message from topic: get participant's id list failed: ", err);
  556. } else {
  557. message.session_id = sessionId;
  558. res.forEach(function (participant) {
  559. if (participant.id !== message.sender_id) {
  560. Sessions.pushNotification(participant.id, message);
  561. }
  562. });
  563. }
  564. })
  565. }).catch(function (err) {
  566. ModelUtil.emitError(self.eventEmitter, "Error occurred while save message to topic: ");
  567. handler(err, messageId)
  568. })
  569. } else {
  570. handler("用户不在此会话当中!", messageId);
  571. }
  572. })
  573. }
  574. /**
  575. * 置顶操作
  576. */
  577. stickSession(sessionId, user) {
  578. let user_session_key = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, user);
  579. let self = this;
  580. //取出最大的session
  581. redis.zrevrangeAsync(user_session_key, 0, 0).then(function (res) {
  582. //获取该session的时间搓
  583. redis.zscoreAsync(user_session_key, res).then(function (scoreres) {
  584. let nowtime = new Date().getTime();
  585. //当前时间搓比redis的时间搓更早证明没有置顶过
  586. if (scoreres <= nowtime) {
  587. //初始化置顶
  588. redis.zaddAsync(user_session_key, STICKY_SESSION_BASE_SCORE, sessionId).then(function (res) {
  589. logger.info("stickSession:" + sessionId + ",res:" + res);
  590. ModelUtil.emitOK(self.eventEmitter, {});
  591. }).then(function () {
  592. SessionRepo.saveStickySession(sessionId, user, STICKY_SESSION_BASE_SCORE);
  593. })
  594. } else {
  595. //已有置顶的数据,取出来加1保存回去
  596. scoreres = Number(scoreres) + 1;
  597. redis.zaddAsync(user_session_key, scoreres, sessionId).then(function () {
  598. logger.info("stickSession:" + sessionId + ",res:" + res);
  599. ModelUtil.emitOK(self.eventEmitter, {});
  600. }).then(function () {
  601. SessionRepo.saveStickySession(sessionId, user, scoreres);
  602. })
  603. }
  604. })
  605. })
  606. }
  607. /**
  608. * 取消会话置顶
  609. */
  610. cancelStickSession(sessionId, user) {
  611. let user_session_key = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, user);
  612. let participants_key = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  613. let self = this;
  614. redis.zscoreAsync(participants_key, user).then(function (res) {
  615. if (!res) {
  616. res = new Date().getTime();
  617. }
  618. redis.zaddAsync(user_session_key, res, sessionId).then(function (res) {
  619. logger.info("cancelStickSession:" + sessionId);
  620. ModelUtil.emitOK(self.eventEmitter, {});
  621. }).then(function () {
  622. SessionRepo.unstickSession(sessionId, user);
  623. });
  624. })
  625. }
  626. /**
  627. * 更新会话参与者的最后消息获取时间。
  628. *
  629. * @param sessionId
  630. * @param userId
  631. */
  632. static updateParticipantLastFetchTime(sessionId, userId, score) {
  633. let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  634. redis.zaddAsync(participantsKey, score, userId)
  635. .then(function (res) {
  636. ParticipantRepo.updateLastFetchTime(new Date(score), sessionId, userId, function (err, res) {
  637. if (err) {
  638. logger.error("Update participant last fetch time failed: ", err);
  639. }
  640. });
  641. })
  642. .catch(function (err) {
  643. logger.error("Update participant last fetch time failed: ", err);
  644. });
  645. }
  646. /**
  647. * 向用户推送通知,微信端用户直接推送消息,APP端通过个推发送通知消息。
  648. *
  649. * @param targetUserId
  650. * @param message
  651. */
  652. static pushNotification(targetUserId, message) {
  653. Users.isPatientId(targetUserId, function (err, isPatient) {
  654. if (isPatient) {
  655. WechatClient.sendMessage(targetUserId, message);
  656. }
  657. else {
  658. AppClient.sendNotification(targetUserId, message);
  659. }
  660. });
  661. }
  662. }
  663. // Expose class
  664. module.exports = Sessions;