sessions.js 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700
  1. /**
  2. * 会话模型。
  3. */
  4. "use strict";
  5. let RedisClient = require('../../repository/redis/redis.client.js');
  6. let RedisModel = require('./../redis.model.js');
  7. let ModelUtil = require('../../util/model.util');
  8. let Messages = require('../messages/messages');
  9. let Users = require('../user/users');
  10. let Participants = require('./Participants');
  11. let SessionRepo = require('../../repository/mysql/session.repo');
  12. let ParticipantRepo = require('../../repository/mysql/participant.repo');
  13. let WechatClient = require("../client/wechat.client.js");
  14. let AppClient = require("../client/app.client.js");
  15. let configFile = require('../../include/commons').CONFIG_FILE;
  16. let config = require('../../resources/config/' + configFile);
  17. let redis = RedisClient.redisClient().connection;
  18. let logger = require('../../util/log.js');
  19. let mongoose = require('mongoose');
  20. let async = require("async");
  21. const REDIS_KEYS = require('../../include/commons').REDIS_KEYS;
  22. const SESSION_TYPES = require('../../include/commons').SESSION_TYPES;
  23. const STICKY_SESSION_BASE_SCORE = require('../../include/commons').STICKY_SESSION_BASE_SCORE;
  24. const SESSION_BUSINESS_TYPE = require('../../include/commons').SESSION_BUSINESS_TYPE;
  25. class Sessions extends RedisModel {
  26. constructor() {
  27. super();
  28. }
  29. /**
  30. * 创建会话。会话的ID来源:
  31. * MUC:患者的ID
  32. * P2P:对成员的ID排序后,取hash值
  33. * GROUP:团队的ID
  34. *
  35. * @param sessionId
  36. * @param name 会话名称
  37. * @param type 会话类型
  38. * @param participantArray 会话成员
  39. * @param handler 回调,仅MUC模式使用
  40. */
  41. createSession(sessionId, name, type, participantArray, handler) {
  42. let self = this;
  43. if (type == SESSION_TYPES.P2P||type==SESSION_TYPES.SYSTEM) {
  44. var participantIdArray = [];
  45. for (let i in participantArray) {
  46. participantIdArray.push(participantArray[i].split(":")[0]);
  47. }
  48. if (participantIdArray.length != 2) {
  49. ModelUtil.emitDataNotFound(self.eventEmitter, {message: "P2P session only allow 2 participants."});
  50. return false;
  51. }
  52. ParticipantRepo.findSessionIdByParticipantIds(participantIdArray[0], participantIdArray[1], function (err, res) {
  53. sessionId = res;
  54. callBusinessType(sessionId);
  55. });
  56. } else {
  57. callBusinessType(sessionId);
  58. }
  59. var businessType = SESSION_BUSINESS_TYPE.DOCTOR;
  60. function callBusinessType(sessionId) {
  61. for (var j = 0; j < participantArray.length; j++)
  62. callIsPatient(j, participantArray.length);
  63. }
  64. function callIsPatient(j, length) {
  65. Users.isPatientId(participantArray[j], function (isPatient) {
  66. if (isPatient) {
  67. businessType = SESSION_BUSINESS_TYPE.PATIENT
  68. }
  69. if (length - 1 == j || businessType == SESSION_BUSINESS_TYPE.PATIENT) {
  70. callCreate(sessionId, businessType);
  71. }
  72. })
  73. }
  74. function callCreate(sessionId, businessType) {
  75. SessionRepo.findOne(sessionId, function (err, res) {
  76. if (res.length > 0) {
  77. let session = res[0];
  78. ModelUtil.emitOK(self.eventEmitter, {
  79. id: session.id,
  80. name: session.name,
  81. type: session.type,
  82. business_type: session.business_type || businessType,
  83. create_date: session.create_date
  84. });
  85. return;
  86. }
  87. let createDate = new Date();
  88. let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
  89. // 保存会话及成员至MySQL中
  90. self.saveSessionToMysql(sessionId, name, type, createDate, businessType, function (err, res) {
  91. Participants.saveParticipantsToMysql(sessionId, participantArray, function (err, res) {
  92. if (err) {
  93. ModelUtil.emitError(self.eventEmitter, err.message);
  94. return;
  95. }
  96. let message = {
  97. sender_id: "System",
  98. sender_name: "System",
  99. content_type: 1,
  100. content: "",
  101. timestamp: createDate
  102. };
  103. Messages.updateLastContent(sessionKey, type, name, message);
  104. Participants.saveParticipantsToRedis(sessionId, participantArray, createDate, function (res) {
  105. if (handler) {
  106. handler(true, sessionId);
  107. } else {
  108. ModelUtil.emitOK(self.eventEmitter, {id: sessionId});
  109. }
  110. });
  111. });
  112. });
  113. });
  114. }
  115. }
  116. /**
  117. * 保存session到MySQL
  118. * @param sessionId
  119. * @param name
  120. * @param type
  121. * @param createDate
  122. * @param handler
  123. */
  124. saveSessionToMysql(sessionId, name, type, createDate, businessType, handler) {
  125. SessionRepo.saveSession(sessionId, name, type, createDate, businessType, handler);
  126. }
  127. /**
  128. * 获取某个用户的全部session列表
  129. * @param userId
  130. * @param handler
  131. */
  132. getUserSessionsFromMysql(userId, handler) {
  133. SessionRepo.findAll(userId, handler);
  134. }
  135. /**
  136. * 获取session单个对象
  137. * @param sessionId
  138. * @param handler
  139. */
  140. getSessions(sessionId, handler) {
  141. SessionRepo.findOne(sessionId, handler);
  142. }
  143. /**
  144. * 根据用户ID获取用户的session列表
  145. * @param userId
  146. * @param page
  147. * @param size
  148. */
  149. getUserSessions(userId, page, size, businessType) {
  150. let userSessionKey = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, userId);
  151. let self = this;
  152. if (page > 0) {
  153. if (page == 1) {
  154. page = 0;
  155. }
  156. page = page + page * size;
  157. }
  158. async.waterfall([
  159. // 获取会话ID列表
  160. function (callback) {
  161. redis.zrevrangeAsync(userSessionKey, page, size)
  162. .then(function (sessionIds) {
  163. if (sessionIds.length == 0) {
  164. ModelUtil.emitOK(self.eventEmitter, []);
  165. return;
  166. }
  167. callback(null, sessionIds);
  168. })
  169. },
  170. // 遍历会话
  171. function (sessionIds, callback) {
  172. let sessionList = [];
  173. sessionIds.forEach(function (sessionId) {
  174. let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
  175. let participantsRoleKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipantsRole, sessionId);
  176. let sessionParticipantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  177. redis.multi()
  178. .hgetall(sessionKey) // 会话实体
  179. .hget(participantsRoleKey, userId) // 用户在此会话中的角色
  180. .zscore(sessionParticipantsKey, userId) // 用户在此会话中最后一次获取未读消息的时间
  181. .execAsync()
  182. .then(function (res) {
  183. let session = res[0];
  184. let role = res[1];
  185. let lastFetchTime = res[2];
  186. if (businessType && businessType != session.business_type) {
  187. logger.info("businessType:" + businessType + "<>" + session.business_type);
  188. } else {
  189. // 计算未读消息数
  190. let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  191. redis.zcountAsync(messagesByTimestampKey, lastFetchTime, new Date().getTime())
  192. .then(function (count) {
  193. sessionList.push({
  194. id: sessionId,
  195. name: session.name,
  196. create_date: session.create_date,
  197. last_content_type: session.last_content_type,
  198. last_content: session.last_content,
  199. sender_id: session.sender_id,
  200. type: session.type,
  201. sender_name: session.sender_name,
  202. unread_count: count,
  203. business_type: session.business_type,
  204. my_role: role
  205. });
  206. if (sessionId === sessionIds[sessionIds.length - 1]) {
  207. ModelUtil.emitOK(self.eventEmitter, sessionList);
  208. }
  209. })
  210. }
  211. ;
  212. })
  213. .catch(function (err) {
  214. ModelUtil.emitError(self.eventEmitter, "Get sessions failed: " + err);
  215. });
  216. });
  217. }
  218. ]);
  219. }
  220. /**
  221. * 获取会话消息。全部,不管已读/未读状态。
  222. *
  223. * @param sessionId 会话ID
  224. * @param userId 拉取消息的人
  225. * @param page 第几页
  226. * @param pagesize 分页数量
  227. * @param start_msg_id 消息会话最新的一条消息的ID
  228. * @param end_msg_id 消息会话刚开始的消息ID
  229. */
  230. getMessages(sessionId, user, start_msg_id, end_msg_id, page, pagesize, isoffset) {
  231. let self = this;
  232. let message_timestamp_key = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  233. if (!start_msg_id && !end_msg_id) {
  234. redis.zrevrangeAsync(message_timestamp_key, 0, 0).then(function (res) {
  235. if (res.length == 0) {
  236. ModelUtil.emitOK(self.eventEmitter, res);
  237. return;
  238. }
  239. start_msg_id = res[0];
  240. redis.zrangeAsync(message_timestamp_key, 0, 0).then(function (res) {
  241. if (res.length == 0) {
  242. ModelUtil.emitOK(self.eventEmitter, res);
  243. return;
  244. }
  245. end_msg_id = res[0];
  246. self.getMessagesByPage(sessionId, user, end_msg_id, start_msg_id, page, pagesize, isoffset, function (err, res) {
  247. if (err) {
  248. logger.error("getMessagesByPage error" + err);
  249. ModelUtil.emitError(self.eventEmitter, err, err);
  250. } else {
  251. ModelUtil.emitOK(self.eventEmitter, res);
  252. }
  253. })
  254. })
  255. })
  256. } else if (!start_msg_id) {
  257. redis.zrevrangeAsync(message_timestamp_key, 0, 0).then(function (res) {
  258. if (res.length == 0) {
  259. ModelUtil.emitOK(self.eventEmitter, res);
  260. return;
  261. }
  262. start_msg_id = res[0];
  263. self.getMessagesByPage(sessionId, user, end_msg_id, start_msg_id, page, pagesize, isoffset, function (err, res) {
  264. if (err) {
  265. logger.error("getMessagesByPage error" + err);
  266. ModelUtil.emitError(self.eventEmitter, err, err);
  267. } else {
  268. ModelUtil.emitOK(self.eventEmitter, res);
  269. }
  270. })
  271. })
  272. } else if (!end_msg_id) {
  273. redis.zrangeAsync(message_timestamp_key, 0, 0).then(function (res) {
  274. if (res.length == 0) {
  275. ModelUtil.emitOK(self.eventEmitter, res);
  276. return;
  277. }
  278. end_msg_id = res[0];
  279. self.getMessagesByPage(sessionId, user, start_msg_id, end_msg_id, page, pagesize, isoffset, function (err, res) {
  280. if (err) {
  281. logger.error("getMessagesByPage error" + err);
  282. ModelUtil.emitError(self.eventEmitter, err, err);
  283. } else {
  284. ModelUtil.emitOK(self.eventEmitter, res);
  285. }
  286. })
  287. })
  288. } else {
  289. self.getMessagesByPage(sessionId, user, end_msg_id, start_msg_id, page, pagesize, isoffset, function (err, res) {
  290. if (err) {
  291. logger.error("getMessagesByPage error" + err);
  292. ModelUtil.emitError(self.eventEmitter, err, err);
  293. } else {
  294. ModelUtil.emitOK(self.eventEmitter, res);
  295. }
  296. })
  297. }
  298. }
  299. /**
  300. * 分页获取会话消息。
  301. *
  302. * @param sessionId 必选。会话ID
  303. * @param userId 必选。用户ID
  304. * @param startMsgId 必选。会话的的起始消息ID,作为检索的起始依据
  305. * @param endMsgId 必选。会话中的结束消息ID
  306. * @param page 必选。页码
  307. * @param size 必选。页面大小
  308. * @param handler 必选。回调
  309. */
  310. getMessagesByPage(sessionId, userId, startMsgId, endMsgId, page, size, isoffset, handler) {
  311. let messagesTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  312. let messagesKey = RedisModel.makeRedisKey(REDIS_KEYS.Messages, sessionId);
  313. let participants = new Participants();
  314. let offset = (page - 1 < 0 ? 0 : page - 1) * size;
  315. let count = size;
  316. if (page > 1 || isoffset == 1) {
  317. offset += 1; // 翻页由于闭区间,需跳过本身数据
  318. }
  319. participants.existsParticipant(sessionId, userId, function (err, res) {
  320. if (!res) {
  321. handler(Error("User not found in session " + sessionId), null);
  322. } else {
  323. //将消息ID转换成分值
  324. redis.multi()
  325. .zscore(messagesTimestampKey, startMsgId)
  326. .zscore(messagesTimestampKey, endMsgId)
  327. .execAsync()
  328. .then(function (res) {
  329. let startMsgScore = res[1];
  330. let endMsgScore = res[0];
  331. if (startMsgScore == null || endMsgScore == null || startMsgScore == endMsgScore) {
  332. handler(null, []);
  333. return;
  334. }
  335. // 从消息时间表中过滤出要获取的消息ID列表,倒序取出消息
  336. redis.zrevrangebyscoreAsync(messagesTimestampKey, startMsgScore, endMsgScore, "limit", offset, count)
  337. .then(function (res) {
  338. if (res.length == 0) {
  339. handler(null, []);
  340. return;
  341. }
  342. redis.hmgetAsync(messagesKey, res).then(function (messages) {
  343. handler(null, messages);
  344. }).then(function () {
  345. Sessions.updateParticipantLastFetchTime(sessionId, userId, new Date().getTime());
  346. })
  347. })
  348. .catch(function (res) {
  349. handler(res, false);
  350. })
  351. })
  352. }
  353. })
  354. }
  355. /**
  356. * 获取所有会话的未读消息数。
  357. */
  358. getAllSessionsUnreadMessageCount(userId) {
  359. let self = this;
  360. ModelUtil.emitError(self.eventEmitter, {message: "not implemented."}, null);
  361. }
  362. /**
  363. * 获取会话的未读消息数。根据成员最后一次获取消息的时候与当前时间。
  364. *
  365. * @param sessionId
  366. * @param userId
  367. */
  368. getSessionUnreadMessageCount(sessionId, userId) {
  369. let self = this;
  370. let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  371. let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  372. async.waterfall([
  373. // 此成员最后获取消息的时间
  374. function (callback) {
  375. redis.zscoreAsync(participantsKey, userId)
  376. .then(function (lastFetchTime) {
  377. callback(null, lastFetchTime);
  378. })
  379. },
  380. // 计算最后获取消息的时间之后到现在有多少条消息
  381. function (lastFetchTime, callback) {
  382. if (!lastFetchTime) lastFetchTime = 0;
  383. let now = new Date().getTime();
  384. redis.zcountAsync(messagesByTimestampKey, lastFetchTime, now)
  385. .then(function (count) {
  386. ModelUtil.emitOK(self.eventEmitter, {count: count});
  387. })
  388. }
  389. ], function (err, res) {
  390. if (err) {
  391. ModelUtil.emitError(self.eventEmitter, "Get session unread message count failed.")
  392. }
  393. });
  394. }
  395. /**
  396. * 获取会话未读消息数。根据成员最后一次获取消息的时候与当前时间。
  397. */
  398. getSessionUnreadMessages(sessionId, userId) {
  399. let self = this;
  400. let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  401. let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  402. async.waterfall([
  403. // 此成员最后获取消息的时间
  404. function (callback) {
  405. redis.zscoreAsync(participantsKey, userId)
  406. .then(function (lastFetchTime) {
  407. callback(null, lastFetchTime);
  408. })
  409. },
  410. // 最后获取消息的时间之后到现在的消息ID列表
  411. function (lastFetchTime, callback) {
  412. if (!lastFetchTime) lastFetchTime = 0;
  413. let now = new Date().getTime();
  414. redis.zrangebyscoreAsync(messagesByTimestampKey, lastFetchTime, now)
  415. .then(function (messageIds) {
  416. callback(null, messageIds);
  417. })
  418. },
  419. // 获取消息
  420. function (messageIds, callback) {
  421. if (messageIds.length == 0) {
  422. ModelUtil.emitOK(self.eventEmitter, []);
  423. return;
  424. }
  425. let startMsgId = messageIds[0];
  426. let endMsgId = messageIds[messageIds.length - 1];
  427. self.getMessagesByPage(sessionId, userId, startMsgId, endMsgId, 0, messageIds.length, 0, function (err, res) {
  428. if (err) {
  429. ModelUtil.emitError(self.eventEmitter, err.message);
  430. return;
  431. }
  432. ModelUtil.emitOK(self.eventEmitter, res);
  433. });
  434. }
  435. ], function (err, res) {
  436. if (err) {
  437. ModelUtil.emitError(self.eventEmitter, "Get session unread message count failed.")
  438. }
  439. });
  440. }
  441. /**
  442. * 保存消息。
  443. *
  444. * 也可以根据议题保存消息,但最终还是保存到与会话对象。
  445. *
  446. * see also: saveMessageByTopic
  447. *
  448. * @param message
  449. * @param sessionId
  450. */
  451. saveMessageBySession(sessionId, message) {
  452. let self = this;
  453. let messages = new Messages();
  454. let participants = new Participants();
  455. let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
  456. let messageId = mongoose.Types.ObjectId().toString();
  457. message.id = messageId;
  458. // 检查会话中是否存在此成员
  459. participants.existsParticipant(sessionId, message.sender_id, function (err, res) {
  460. if (err) {
  461. ModelUtil.emitError(self.eventEmitter, "Check session paticipant failed: ", err);
  462. return;
  463. }
  464. if (res) {
  465. redis.hmgetAsync(sessionKey, ["type", "name"]).then(function (res) {
  466. let sessionType = res[0];
  467. if (sessionType == null) {
  468. ModelUtil.emitError(self.eventEmitter, "Session with id " + sessionId + " not found.");
  469. return;
  470. }
  471. messages.saveMessageToRedis(sessionId, sessionType, messageId, message);
  472. Sessions.updateParticipantLastFetchTime(sessionId, message.sender_id, message.timestamp.getTime());
  473. messages.saveMessageToMysql(sessionId, sessionType, messageId, message, function (err, res) {
  474. if (err) {
  475. ModelUtil.emitError(self.eventEmitter, {message: "Failed to save message to mysql: " + err});
  476. } else {
  477. message.timestamp = message.timestamp.getTime();
  478. ModelUtil.emitOK(self.eventEmitter, {count: 1, messages: [message]});
  479. }
  480. });
  481. }).then(function (res) {
  482. // 推送消息
  483. ParticipantRepo.findIds(sessionId, function (err, res) {
  484. if (err) {
  485. ModelUtil.logError("Push message from session: get participant's id list failed: ", err);
  486. } else {
  487. message.session_id = sessionId;
  488. res.forEach(function (participant) {
  489. if (participant.id !== message.sender_id) {
  490. Sessions.pushNotification(participant.id, message);
  491. }
  492. });
  493. }
  494. })
  495. }).catch(function (err) {
  496. ModelUtil.emitError(self.eventEmitter, {message: "Error occurred while save message to session: " + err});
  497. })
  498. } else {
  499. ModelUtil.emitDataNotFound(self.eventEmitter, {message: "当前会话找不到此发送者"});
  500. }
  501. });
  502. }
  503. /**
  504. * 保存消息
  505. *
  506. * @param message
  507. * @param sessionId
  508. * @param handler
  509. */
  510. saveMessageByTopic(message, sessionId, handler) {
  511. let messages = new Messages();
  512. let participants = new Participants();
  513. let session_key = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
  514. let messageId = mongoose.Types.ObjectId().toString();
  515. let self = this;
  516. let sessionType = 0;
  517. let sessionName = "";
  518. message.id = messageId;
  519. // 发送成员必须处于会话中
  520. participants.existsParticipant(sessionId, message.sender_id, function (err, res) {
  521. if (res) {
  522. redis.hmgetAsync(session_key, ["type", "name"]).then(function (res) {
  523. sessionType = res[0];
  524. sessionName = res[1];
  525. if (!sessionType || !sessionName) {
  526. logger.error("session is error for key " + session_key);
  527. throw "session is not found";
  528. }
  529. }).then(function (res) {
  530. // 更新消息存储
  531. messages.saveMessageToRedis(sessionId, sessionType, messageId, message);
  532. messages.saveMessageToMysql(sessionId, sessionType, messageId, message);
  533. // 更新会话最新状态及成员最后一次消息获取时间
  534. Sessions.updateParticipantLastFetchTime(sessionId, message.sender_id, message.timestamp.getTime());
  535. Messages.updateLastContent(session_key, sessionType, sessionName, message);
  536. handler(null, messageId);
  537. }).then(function (res) {
  538. // 推送消息
  539. ParticipantRepo.findIds(sessionId, function (err, res) {
  540. if (err) {
  541. ModelUtil.logError("Push message from topic: get participant's id list failed: ", err);
  542. } else {
  543. message.session_id = sessionId;
  544. res.forEach(function (participant) {
  545. if (participant.id !== message.sender_id) {
  546. Sessions.pushNotification(participant.id, message);
  547. }
  548. });
  549. }
  550. })
  551. }).catch(function (err) {
  552. ModelUtil.emitError(self.eventEmitter, "Error occurred while save message to topic: ");
  553. handler(err, messageId)
  554. })
  555. } else {
  556. handler("用户不在此会话当中!", messageId);
  557. }
  558. })
  559. }
  560. /**
  561. * 置顶操作
  562. */
  563. stickSession(sessionId, user) {
  564. let user_session_key = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, user);
  565. let self = this;
  566. //取出最大的session
  567. redis.zrevrangeAsync(user_session_key, 0, 0).then(function (res) {
  568. //获取该session的时间搓
  569. redis.zscoreAsync(user_session_key, res).then(function (scoreres) {
  570. let nowtime = new Date().getTime();
  571. //当前时间搓比redis的时间搓更早证明没有置顶过
  572. if (scoreres <= nowtime) {
  573. //初始化置顶
  574. redis.zaddAsync(user_session_key, STICKY_SESSION_BASE_SCORE, sessionId).then(function (res) {
  575. logger.info("stickSession:" + sessionId + ",res:" + res);
  576. ModelUtil.emitOK(self.eventEmitter, {});
  577. }).then(function () {
  578. SessionRepo.saveStickySession(sessionId, user, STICKY_SESSION_BASE_SCORE);
  579. })
  580. } else {
  581. //已有置顶的数据,取出来加1保存回去
  582. scoreres = Number(scoreres) + 1;
  583. redis.zaddAsync(user_session_key, scoreres, sessionId).then(function () {
  584. logger.info("stickSession:" + sessionId + ",res:" + res);
  585. ModelUtil.emitOK(self.eventEmitter, {});
  586. }).then(function () {
  587. SessionRepo.saveStickySession(sessionId, user, scoreres);
  588. })
  589. }
  590. })
  591. })
  592. }
  593. /**
  594. * 取消会话置顶
  595. */
  596. cancelStickSession(sessionId, user) {
  597. let user_session_key = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, user);
  598. let participants_key = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  599. let self = this;
  600. redis.zscoreAsync(participants_key, user).then(function (res) {
  601. if (!res) {
  602. res = new Date().getTime();
  603. }
  604. redis.zaddAsync(user_session_key, res, sessionId).then(function (res) {
  605. logger.info("cancelStickSession:" + sessionId);
  606. ModelUtil.emitOK(self.eventEmitter, {});
  607. }).then(function () {
  608. SessionRepo.unstickSession(sessionId, user);
  609. });
  610. })
  611. }
  612. /**
  613. * 更新会话参与者的最后消息获取时间。
  614. *
  615. * @param sessionId
  616. * @param userId
  617. */
  618. static updateParticipantLastFetchTime(sessionId, userId, score) {
  619. let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  620. redis.zaddAsync(participantsKey, score, userId)
  621. .then(function (res) {
  622. ParticipantRepo.updateLastFetchTime(new Date(score), sessionId, userId, function (err, res) {
  623. if (err) {
  624. logger.error("Update participant last fetch time failed: ", err);
  625. }
  626. });
  627. })
  628. .catch(function (err) {
  629. logger.error("Update participant last fetch time failed: ", err);
  630. });
  631. }
  632. /**
  633. * 向用户推送通知,微信端用户直接推送消息,APP端通过个推发送通知消息。
  634. *
  635. * @param targetUserId
  636. * @param message
  637. */
  638. static pushNotification(targetUserId, message) {
  639. Users.isPatientId(targetUserId, function (err, isPatient) {
  640. if (isPatient) {
  641. WechatClient.sendMessage(targetUserId, message);
  642. }
  643. else {
  644. AppClient.sendNotification(targetUserId, message);
  645. }
  646. });
  647. }
  648. }
  649. // Expose class
  650. module.exports = Sessions;