sessions.js 40 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980
  1. /**
  2. * 会话模型。
  3. */
  4. "use strict";
  5. let RedisClient = require('../../repository/redis/redis.client.js');
  6. let RedisModel = require('./../redis.model.js');
  7. let ModelUtil = require('../../util/model.util');
  8. let Messages = require('../messages/messages');
  9. let Users = require('../user/users');
  10. let Participants = require('./Participants');
  11. let SessionRepo = require('../../repository/mysql/session.repo');
  12. let TopicRepo = require('../../repository/mysql/topics.repo');
  13. let ParticipantRepo = require('../../repository/mysql/participant.repo');
  14. let MessageRepo = require('../../repository/mysql/message.repo');
  15. let WechatClient = require("../client/wechat.client.js");
  16. let AppClient = require("../client/app.client.js");
  17. let configFile = require('../../include/commons').CONFIG_FILE;
  18. let config = require('../../resources/config/' + configFile);
  19. let redis = RedisClient.redisClient().connection;
  20. let logger = require('../../util/log.js');
  21. let mongoose = require('mongoose');
  22. let async = require("async");
  23. const REDIS_KEYS = require('../../include/commons').REDIS_KEYS;
  24. const SESSION_TYPES = require('../../include/commons').SESSION_TYPES;
  25. const STICKY_SESSION_BASE_SCORE = require('../../include/commons').STICKY_SESSION_BASE_SCORE;
  26. const SESSION_BUSINESS_TYPE = require('../../include/commons').SESSION_BUSINESS_TYPE;
  27. class Sessions extends RedisModel {
  28. constructor() {
  29. super();
  30. }
  31. /**
  32. * 创建会话。会话的ID来源:
  33. * MUC:患者的ID
  34. * P2P:对成员的ID排序后,取hash值
  35. * GROUP:团队的ID
  36. *
  37. * @param sessionId
  38. * @param name 会话名称
  39. * @param type 会话类型
  40. * @param participantArray 会话成员
  41. * @param handler 回调,仅MUC模式使用
  42. */
  43. createSession(sessionId, name, type, participantArray, handler) {
  44. let self = this;
  45. let messageId = mongoose.Types.ObjectId().toString();
  46. //创建session到mysql
  47. self.createSessionToMysql(sessionId, name, type, participantArray,messageId,function(err,res){
  48. if(err){
  49. logger.error(err);
  50. }else{
  51. logger.info("create session to mysql success :" +JSON.stringify(res));
  52. }
  53. });
  54. //创建session到redis
  55. self.createSessionToRedis(sessionId, name, type, participantArray,messageId, function(err,res){
  56. if(err){
  57. if(handler){handler(err,null);return;};
  58. ModelUtil.emitError(self.eventEmitter, {message:err,status:-1}, null);
  59. }else{
  60. if(handler){handler(null,res);return;};
  61. ModelUtil.emitOK(self.eventEmitter,{status:200,data:res});
  62. }
  63. });
  64. }
  65. /**
  66. * 创建会话。REDIS
  67. * @param sessionId
  68. * @param name
  69. * @param type
  70. * @param participantArray
  71. * @param messageId
  72. * @param handler
  73. * @returns {boolean}
  74. */
  75. createSessionToRedis(sessionId, name, type, participantArray,messageId, handler){
  76. let self = this;
  77. let messages = new Messages();
  78. var participantIdArray = [];
  79. for (let i in participantArray) {
  80. participantIdArray.push(participantArray[i].split(":")[0]);
  81. }
  82. if (type == SESSION_TYPES.P2P||type==SESSION_TYPES.SYSTEM) {
  83. if(sessionId){
  84. callBusinessType(sessionId);
  85. return;
  86. }
  87. if (participantIdArray.length != 2) {
  88. handler("P2P session only allow 2 participants.",null);
  89. return false;
  90. }
  91. ParticipantRepo.findSessionIdByParticipantIds(participantIdArray[0], participantIdArray[1], function (err, res) {
  92. sessionId = res;
  93. callBusinessType(sessionId);
  94. });
  95. } else {
  96. if(!sessionId){
  97. handler("MUC OR GROUP session sessionId is not allow null .",null);
  98. return;
  99. }
  100. callBusinessType(sessionId);
  101. }
  102. function callBusinessType(sessionId) {
  103. ParticipantRepo.getBusinessType(participantIdArray.join("','"),function(err,businessType){
  104. callCreate(sessionId, businessType);
  105. });
  106. }
  107. function callCreate(sessionId, businessType) {
  108. let createDate = new Date();
  109. let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
  110. let message = {
  111. sender_id: "system",
  112. sender_name: "system",
  113. content_type: 11,
  114. content: "会话创建成功",
  115. timestamp: createDate,
  116. id:messageId
  117. };
  118. let session = {
  119. id :sessionId,
  120. name:name,
  121. type:type,
  122. create_date:createDate.getTime(),
  123. business_type:businessType,
  124. last_sender_id : message.sender_id,
  125. last_sender_name : message.sender_name,
  126. last_message_time: message.timestamp.getTime(),
  127. last_content : message.content,
  128. last_content_type : message.content_type
  129. }
  130. redis.hmsetAsync(sessionKey, session).then(function(){
  131. Participants.saveParticipantsToRedis(sessionId, participantArray, createDate, function (res) {
  132. handler(null, session);
  133. //messages.saveMessageToRedisFromCreateSession(sessionId, messageId, message);
  134. });
  135. })
  136. }
  137. }
  138. /**
  139. * 创建会话。mysql
  140. * @param sessionId
  141. * @param name
  142. * @param type
  143. * @param participantArray
  144. * @param messageId
  145. * @param handler
  146. */
  147. createSessionToMysql(sessionId, name, type, participantArray,messageId, handler){
  148. let self = this;
  149. //如果sessionId不存在则执行创建sessionId过程
  150. let participantIdArray = [];
  151. for (let i in participantArray) {
  152. participantIdArray.push(participantArray[i].split(":")[0]);
  153. }
  154. //流程1-判断是否存在sessionId不存在则创建对应的sessionId;
  155. if(!sessionId){
  156. if (type == SESSION_TYPES.P2P||type==SESSION_TYPES.SYSTEM) {
  157. ParticipantRepo.findSessionIdByParticipantIds(participantIdArray[0], participantIdArray[1], function (err, res) {
  158. sessionId = res;
  159. callBusinessType();
  160. });
  161. }else{
  162. return handler("MUC模式和团队模式,不允许sessionId为空!",null);
  163. }
  164. }else{
  165. callBusinessType();
  166. }
  167. //流程2-判断session的业务类型;
  168. function callBusinessType(){
  169. ParticipantRepo.getBusinessType(participantIdArray.join("','"),function(err,businessType){
  170. if(err){handler(err,null);return;}
  171. callCreateSession(businessType);
  172. });
  173. }
  174. //流程3-发起session创建 返回session实例
  175. function callCreateSession(businessType){
  176. //查找该sessionId是否存在存在则直接返回实例
  177. SessionRepo.findOne(sessionId, function (err, res) {
  178. if (res.length > 0) {//已经存在
  179. handler(null,res[0]);
  180. }else{
  181. let createDate = new Date();
  182. let session ={
  183. id :sessionId,
  184. name:name,
  185. type:type,
  186. create_date:createDate.getTime(),
  187. business_type:businessType
  188. };
  189. //将session写入数据库
  190. self.saveSessionToMysql(sessionId, name, type, createDate, businessType, function (err, res) {
  191. if(err){handler(err,null);return;};
  192. callCreateParticipants(session);
  193. })
  194. }
  195. });
  196. }
  197. //流程4-发起session成员创建
  198. function callCreateParticipants(session){
  199. Participants.saveParticipantsToMysql(sessionId, participantArray, function (err, res) {
  200. if(err){handler(err,null);return;};
  201. callBeginTrans(session);
  202. })
  203. }
  204. //流程5-发起session会话
  205. function callBeginTrans(session) {
  206. let mesDate = new Date();
  207. let message = {
  208. sender_id: "system",
  209. sender_name: "system",
  210. content_type: 6,
  211. content: "会话创建成功",
  212. timestamp: mesDate,
  213. id:messageId
  214. };
  215. session.last_sender_id = message.sender_id;
  216. session.last_sender_name = message.sender_name;
  217. session.last_message_time = mesDate.getTime();
  218. session.last_content = message.content;
  219. session.last_content_type = message.content_type;
  220. SessionRepo.updateSessionLastStatus(message.sender_id,
  221. message.sender_name,
  222. message.timestamp,
  223. message.content,
  224. message.content_type,
  225. sessionId, function (err, res) {
  226. if (err) {handler(err,null);return;};
  227. handler(null,session);
  228. });
  229. }
  230. }
  231. /**
  232. * 最近会话列表,7天内。
  233. *
  234. * @param userId
  235. * @param dateSpan
  236. */
  237. getRecentSessions(userId, dateSpan){
  238. let self = this;
  239. SessionRepo.findAllByTimestampAndType(userId, dateSpan, function (err, res) {
  240. if(err){
  241. ModelUtil.emitError(self.eventEmitter, "Get recent sessions failed", err);
  242. return;
  243. }
  244. let sessions = [];
  245. res.forEach(function (session) {
  246. sessions.push({
  247. id: session.id,
  248. name: session.name,
  249. type: session.type,
  250. business_type: session.business_type,
  251. create_date: session.create_date
  252. })
  253. });
  254. ModelUtil.emitOK(self.eventEmitter, sessions);
  255. });
  256. }
  257. /**
  258. * 保存session到MySQL
  259. * @param sessionId
  260. * @param name
  261. * @param type
  262. * @param createDate
  263. * @param businessType
  264. * @param handler
  265. */
  266. saveSessionToMysql(sessionId, name, type, createDate, businessType, handler) {
  267. SessionRepo.saveSession(sessionId, name, type, createDate, businessType, handler);
  268. }
  269. /**
  270. * 获取某个用户的全部session列表
  271. * @param userId
  272. * @param handler
  273. */
  274. getUserSessionsFromMysql(userId, handler) {
  275. SessionRepo.findAll(userId, handler);
  276. }
  277. /**
  278. * 获取session单个对象
  279. * @param sessionId
  280. * @param handler
  281. */
  282. getSessions(sessionId, handler) {
  283. SessionRepo.findOne(sessionId, handler);
  284. }
  285. /**
  286. * 根据用户ID获取用户的session列表
  287. * @param userId
  288. * @param page
  289. * @param size
  290. */
  291. getUserSessions(userId, page, size, businessType) {
  292. let userSessionKey = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, userId);
  293. let self = this;
  294. if (page > 0) {
  295. if (page == 1) {
  296. page = 0;
  297. }
  298. page = page + page * size;
  299. }
  300. async.waterfall([
  301. // 获取会话ID列表
  302. function (callback) {
  303. redis.zrevrangeAsync(userSessionKey, page, size)
  304. .then(function (sessionIds) {
  305. if (sessionIds.length == 0) {
  306. ModelUtil.emitOK(self.eventEmitter, []);
  307. return;
  308. }
  309. callback(null, sessionIds);
  310. })
  311. },
  312. // 遍历会话
  313. function (sessionIds, callback) {
  314. let sessionList = [];
  315. sessionIds.forEach(function (sessionId) {
  316. let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
  317. let participantsRoleKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipantsRole, sessionId);
  318. let sessionParticipantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  319. let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants,sessionId);
  320. redis.multi()
  321. .hgetall(sessionKey) // 会话实体
  322. .hget(participantsRoleKey, userId) // 用户在此会话中的角色
  323. .zscore(sessionParticipantsKey, userId) // 用户在此会话中最后一次获取未读消息的时间
  324. .zrange(participantsKey,0,-1)
  325. .execAsync()
  326. .then(function (res) {
  327. let session = res[0];
  328. let role = res[1];
  329. let lastFetchTime = res[2];
  330. let users = res[3];
  331. let sessionName="";
  332. let otheruserId ="";
  333. if(session.type==SESSION_TYPES.P2P){
  334. for(var j in users){
  335. if(users[j]!=userId){
  336. otheruserId = users[j];
  337. }
  338. }
  339. }
  340. if(!role)role =0;
  341. if(!lastFetchTime)lastFetchTime=new Date().getTime();
  342. // 计算未读消息数
  343. let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  344. redis.zcountAsync(messagesByTimestampKey, lastFetchTime, new Date().getTime())
  345. .then(function (count) {
  346. if(!otheruserId)otheruserId=userId;
  347. ParticipantRepo.findNameById(otheruserId, function (err, res) {
  348. if((res&&res.length==0)||session.type!=SESSION_TYPES.P2P){
  349. sessionName = session.name;
  350. }else{
  351. sessionName = res[0].name;
  352. }
  353. sessionList.push({
  354. id: sessionId,
  355. name: sessionName,
  356. create_date: session.create_date,
  357. last_content_type: session.last_content_type,
  358. last_content: session.last_content,
  359. sender_id: session.sender_id,
  360. type: session.type,
  361. sender_name: session.sender_name,
  362. unread_count: count,
  363. business_type: session.business_type,
  364. my_role: role
  365. });
  366. if (sessionId === sessionIds[sessionIds.length - 1]) {
  367. ModelUtil.emitOK(self.eventEmitter, sessionList);
  368. }
  369. })
  370. })
  371. }).catch(function (err) {
  372. logger.error("Get sessions failed: ", ex);
  373. ModelUtil.emitError(self.eventEmitter, "Get sessions failed: " + err);
  374. });
  375. });
  376. }
  377. ]);
  378. }
  379. /**
  380. * 获取会话消息。全部,不管已读/未读状态。
  381. *
  382. * @param sessionId 会话ID
  383. * @param userId 拉取消息的人
  384. * @param page 第几页
  385. * @param pagesize 分页数量
  386. * @param start_msg_id 消息会话最新的一条消息的ID
  387. * @param end_msg_id 消息会话刚开始的消息ID
  388. */
  389. getMessages(sessionId, user, start_msg_id, end_msg_id, page, pagesize, isoffset,handler) {
  390. let self = this;
  391. let message_timestamp_key = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  392. if (!start_msg_id && !end_msg_id) {
  393. redis.zrevrangeAsync(message_timestamp_key, 0, 0).then(function (res) {
  394. if (res.length == 0) {
  395. if(handler){
  396. handler(null,res);
  397. return;
  398. }
  399. ModelUtil.emitOK(self.eventEmitter, res);
  400. return;
  401. }
  402. start_msg_id = res[0];
  403. redis.zrangeAsync(message_timestamp_key, 0, 0).then(function (res) {
  404. if (res.length == 0) {
  405. if(handler){
  406. handler(null,res);
  407. return;
  408. }
  409. ModelUtil.emitOK(self.eventEmitter, res);
  410. return;
  411. }
  412. end_msg_id = res[0];
  413. self.getMessagesByPage(sessionId, user, end_msg_id, start_msg_id, page, pagesize, isoffset, function (err, res) {
  414. if (err) {
  415. if(handler){
  416. handler(err,null);
  417. return;
  418. }
  419. logger.error("getMessagesByPage error" + err);
  420. ModelUtil.emitError(self.eventEmitter, err, err);
  421. } else {
  422. if(handler){
  423. handler(null,res);
  424. return;
  425. }
  426. ModelUtil.emitOK(self.eventEmitter, res);
  427. }
  428. })
  429. })
  430. })
  431. } else if (!start_msg_id) {
  432. redis.zrevrangeAsync(message_timestamp_key, 0, 0).then(function (res) {
  433. if (res.length == 0) {
  434. if(handler){
  435. handler(null,res);
  436. return;
  437. }
  438. ModelUtil.emitOK(self.eventEmitter, res);
  439. return;
  440. }
  441. start_msg_id = res[0];
  442. self.getMessagesByPage(sessionId, user, end_msg_id, start_msg_id, page, pagesize, isoffset, function (err, res) {
  443. if (err) {
  444. if(handler){
  445. handler(err,null);
  446. return;
  447. }
  448. logger.error("getMessagesByPage error" + err);
  449. ModelUtil.emitError(self.eventEmitter, err, err);
  450. } else {
  451. if(handler){
  452. handler(null,res);
  453. return;
  454. }
  455. ModelUtil.emitOK(self.eventEmitter, res);
  456. }
  457. })
  458. })
  459. } else if (!end_msg_id) {
  460. redis.zrangeAsync(message_timestamp_key, 0, 0).then(function (res) {
  461. if (res.length == 0) {
  462. ModelUtil.emitOK(self.eventEmitter, res);
  463. return;
  464. }
  465. end_msg_id = res[0];
  466. self.getMessagesByPage(sessionId, user, start_msg_id, end_msg_id, page, pagesize, isoffset, function (err, res) {
  467. if (err) {
  468. if(handler){
  469. handler(err,null);
  470. return;
  471. }
  472. logger.error("getMessagesByPage error" + err);
  473. ModelUtil.emitError(self.eventEmitter, err, err);
  474. } else {
  475. if(handler){
  476. handler(null,res);
  477. return;
  478. }
  479. ModelUtil.emitOK(self.eventEmitter, res);
  480. }
  481. })
  482. })
  483. } else {
  484. self.getMessagesByPage(sessionId, user, end_msg_id, start_msg_id, page, pagesize, isoffset, function (err, res) {
  485. if (err) {
  486. if(handler){
  487. handler(err,null);
  488. return;
  489. }
  490. logger.error("getMessagesByPage error" + err);
  491. ModelUtil.emitError(self.eventEmitter, err, err);
  492. } else {
  493. if(handler){
  494. handler(null,res);
  495. return;
  496. }
  497. ModelUtil.emitOK(self.eventEmitter, res);
  498. }
  499. })
  500. }
  501. }
  502. /**
  503. * 分页获取会话消息。
  504. *
  505. * @param sessionId 必选。会话ID
  506. * @param userId 必选。用户ID
  507. * @param startMsgId 必选。会话的的起始消息ID,作为检索的起始依据
  508. * @param endMsgId 必选。会话中的结束消息ID
  509. * @param page 必选。页码
  510. * @param size 必选。页面大小
  511. * @param handler 必选。回调
  512. */
  513. getMessagesByPage(sessionId, userId, startMsgId, endMsgId, page, size, isoffset, handler) {
  514. let messagesTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  515. let messagesKey = RedisModel.makeRedisKey(REDIS_KEYS.Messages, sessionId);
  516. let participants = new Participants();
  517. let offset = (page - 1 < 0 ? 0 : page - 1) * size;
  518. let count = size;
  519. if (page > 1 || isoffset == 1) {
  520. offset += 1; // 翻页由于闭区间,需跳过本身数据
  521. }
  522. participants.existsParticipant(sessionId, userId, function (err, res) {
  523. if (!res) {
  524. handler(Error("User not found in session " + sessionId), null);
  525. } else {
  526. //将消息ID转换成分值
  527. redis.multi()
  528. .zscore(messagesTimestampKey, startMsgId)
  529. .zscore(messagesTimestampKey, endMsgId)
  530. .execAsync()
  531. .then(function (res) {
  532. let startMsgScore = res[1];
  533. let endMsgScore = res[0];
  534. if (startMsgScore == null || endMsgScore == null || startMsgScore == endMsgScore) {
  535. handler(null, []);
  536. return;
  537. }
  538. // 从消息时间表中过滤出要获取的消息ID列表,倒序取出消息
  539. redis.zrevrangebyscoreAsync(messagesTimestampKey, startMsgScore, endMsgScore, "limit", offset, count)
  540. .then(function (res) {
  541. if (res.length == 0) {
  542. handler(null, []);
  543. return;
  544. }
  545. redis.hmgetAsync(messagesKey, res).then(function (messages) {
  546. handler(null, messages);
  547. }).then(function () {
  548. Sessions.updateParticipantLastFetchTime(sessionId, userId, new Date().getTime());
  549. })
  550. })
  551. .catch(function (err) {
  552. logger.error("Get message by page failed: ", err);
  553. handler(err, false);
  554. })
  555. })
  556. }
  557. })
  558. }
  559. /**
  560. * 获取所有会话的未读消息数。
  561. */
  562. getAllSessionsUnreadMessageCount(userId) {
  563. let self = this;
  564. let count = 0;
  565. SessionRepo.findAll(userId,function(err,res){
  566. if(err){
  567. logger.err("getAllSessionsUnreadMessageCount is fail :"+err);
  568. return;
  569. }
  570. if(res.length==0){
  571. ModelUtil.emitOK(self.eventEmitter, {count: count});
  572. return;
  573. }
  574. for(var j in res){
  575. if(res[j].type==SESSION_TYPES.SYSTEM){
  576. if(j==res.length-1){
  577. ModelUtil.emitOK(self.eventEmitter, {count: count});
  578. }
  579. continue;
  580. }
  581. callback(res,j,res[j]);
  582. }
  583. })
  584. function callback(res,j,session){
  585. self.getSessionUnreadMessageCount(res[j].id,userId,function(err,con){
  586. if(err){
  587. logger.err("getAllSessionsUnreadMessageCount is fail :"+err);
  588. }
  589. count = count+con;
  590. if(j==res.length-1){
  591. ModelUtil.emitOK(self.eventEmitter, {count: count});
  592. }
  593. })
  594. }
  595. }
  596. /**
  597. * 获取会话的未读消息数。根据成员最后一次获取消息的时候与当前时间。
  598. *
  599. * @param sessionId
  600. * @param userId
  601. */
  602. getSessionUnreadMessageCount(sessionId, userId,handler) {
  603. let self = this;
  604. let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  605. let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  606. async.waterfall([
  607. // 此成员最后获取消息的时间
  608. function (callback) {
  609. redis.zscoreAsync(participantsKey, userId)
  610. .then(function (lastFetchTime) {
  611. callback(null, lastFetchTime);
  612. })
  613. },
  614. // 计算最后获取消息的时间之后到现在有多少条消息
  615. function (lastFetchTime, callback) {
  616. if (!lastFetchTime) lastFetchTime = 0;
  617. let now = new Date().getTime();
  618. redis.zcountAsync(messagesByTimestampKey, lastFetchTime, now)
  619. .then(function (count) {
  620. if(handler){
  621. handler(null,count);
  622. }else{
  623. ModelUtil.emitOK(self.eventEmitter, {count: count});
  624. }
  625. })
  626. }
  627. ], function (err, res) {
  628. if (err) {
  629. if(handler){
  630. handler(err,0);
  631. }else{
  632. ModelUtil.emitError(self.eventEmitter, "Get session unread message count failed.")
  633. }
  634. }
  635. });
  636. }
  637. /**
  638. * 获取会话未读消息数。根据成员最后一次获取消息的时候与当前时间。
  639. */
  640. getSessionUnreadMessages(sessionId, userId) {
  641. let self = this;
  642. let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  643. let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  644. async.waterfall([
  645. // 此成员最后获取消息的时间
  646. function (callback) {
  647. redis.zscoreAsync(participantsKey, userId)
  648. .then(function (lastFetchTime) {
  649. callback(null, lastFetchTime);
  650. })
  651. },
  652. // 最后获取消息的时间之后到现在的消息ID列表
  653. function (lastFetchTime, callback) {
  654. if (!lastFetchTime) lastFetchTime = 0;
  655. let now = new Date().getTime();
  656. redis.zrangebyscoreAsync(messagesByTimestampKey, lastFetchTime, now)
  657. .then(function (messageIds) {
  658. callback(null, messageIds);
  659. })
  660. },
  661. // 获取消息
  662. function (messageIds, callback) {
  663. if (messageIds.length == 0) {
  664. ModelUtil.emitOK(self.eventEmitter, []);
  665. return;
  666. }
  667. let startMsgId = messageIds[0];
  668. let endMsgId = messageIds[messageIds.length - 1];
  669. self.getMessagesByPage(sessionId, userId, startMsgId, endMsgId, 0, messageIds.length, 0, function (err, res) {
  670. if (err) {
  671. ModelUtil.emitError(self.eventEmitter, err.message);
  672. return;
  673. }
  674. ModelUtil.emitOK(self.eventEmitter, res);
  675. });
  676. }
  677. ], function (err, res) {
  678. if (err) {
  679. ModelUtil.emitError(self.eventEmitter, "Get session unread message count failed.")
  680. }
  681. });
  682. }
  683. /**
  684. * 保存消息。
  685. *
  686. * 也可以根据议题保存消息,但最终还是保存到与会话对象。
  687. *
  688. * see also: saveMessageByTopic
  689. *
  690. * @param message
  691. * @param sessionId
  692. */
  693. saveMessageBySession(sessionId, message) {
  694. let self = this;
  695. let messages = new Messages();
  696. let participants = new Participants();
  697. let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
  698. let messageId = mongoose.Types.ObjectId().toString();
  699. message.id = messageId;
  700. // 检查会话中是否存在此成员
  701. participants.existsParticipant(sessionId, message.sender_id, function (err, res) {
  702. if (err) {
  703. ModelUtil.emitError(self.eventEmitter, "Check session paticipant failed: ", err);
  704. return;
  705. }
  706. if (res) {
  707. redis.hmgetAsync(sessionKey, ["type", "name"]).then(function (res) {
  708. let sessionType = res[0];
  709. let sessionName = res[1];
  710. if (sessionType == null) {
  711. ModelUtil.emitError(self.eventEmitter, "Session with id " + sessionId + " not found.");
  712. return;
  713. }
  714. //消息保存到REDIS
  715. messages.saveMessageToRedis(sessionId, sessionType, messageId, message);
  716. //更新REDIS最后一条消息
  717. Messages.updateLastContent(sessionKey, sessionType, sessionName, message);
  718. //更新用户最后一次获取消息的时间
  719. Sessions.updateParticipantLastFetchTime(sessionId, message.sender_id, message.timestamp.getTime());
  720. //更新最后一条消息到数据库
  721. SessionRepo.updateSessionLastStatus(message.sender_id,message.sender_name,message.timestamp,message.content,message.content_type,sessionId);
  722. //将消息保存到数据库
  723. messages.saveMessageToMysql(sessionId, sessionType, messageId, message, function (err, res) {
  724. if (err) {
  725. ModelUtil.emitError(self.eventEmitter, {message: "Failed to save message to mysql: " + err});
  726. } else {
  727. message.timestamp = message.timestamp.getTime();
  728. ModelUtil.emitOK(self.eventEmitter, {count: 1, messages: [message]});
  729. }
  730. });
  731. }).then(function (res) {
  732. // 推送消息
  733. ParticipantRepo.findIds(sessionId, function (err, res) {
  734. if (err) {
  735. ModelUtil.logError("Push message from session: get participant's id list failed: ", err);
  736. } else {
  737. message.session_id = sessionId;
  738. res.forEach(function (participant) {
  739. if (participant.id !== message.sender_id) {
  740. Sessions.pushNotification(participant.id, message);
  741. }
  742. });
  743. }
  744. })
  745. }).catch(function (err) {
  746. ModelUtil.emitError(self.eventEmitter, {message: "Error occurred while save message to session: " + err});
  747. })
  748. } else {
  749. ModelUtil.emitDataNotFound(self.eventEmitter, {message: "当前会话找不到此发送者"});
  750. }
  751. });
  752. }
  753. sendTopicMessages(topicId,message){
  754. let self = this;
  755. TopicRepo.findAllByTopicId(topicId,function(err,res){
  756. if(err||res.length==0){
  757. ModelUtil.emitOK(self.eventEmitter, {status:-1,"message": "议题获取失败"});
  758. return;
  759. }
  760. self.saveMessageByTopic(message,res[0].session_id,function(err,messageId){
  761. if(err){
  762. ModelUtil.emitOK(self.eventEmitter, {status:-1,"message":err});
  763. }else{
  764. message.id = messageId;
  765. ModelUtil.emitOK(self.eventEmitter, {status:200,"message":"发送成功",data:message});
  766. }
  767. });
  768. });
  769. }
  770. /**
  771. * 保存消息
  772. *
  773. * @param message
  774. * @param sessionId
  775. * @param handler
  776. */
  777. saveMessageByTopic(message, sessionId, handler) {
  778. let messages = new Messages();
  779. let participants = new Participants();
  780. let session_key = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
  781. let messageId = mongoose.Types.ObjectId().toString();
  782. let self = this;
  783. let sessionType = 0;
  784. let sessionName = "";
  785. message.id = messageId;
  786. // 发送成员必须处于会话中
  787. participants.existsParticipant(sessionId, message.sender_id, function (err, res) {
  788. if (res) {
  789. redis.hmgetAsync(session_key, ["type", "name"]).then(function (res) {
  790. sessionType = res[0];
  791. sessionName = res[1];
  792. if (!sessionType || !sessionName) {
  793. logger.error("session is error for key " + session_key);
  794. throw "session is not found";
  795. }
  796. }).then(function (res) {
  797. // 更新消息存储REDIS
  798. messages.saveMessageToRedis(sessionId, sessionType, messageId, message);
  799. //更新消息存储到mysql
  800. messages.saveMessageToMysql(sessionId, sessionType, messageId, message);
  801. // 更新会话最新状态及成员最后一次消息获取时间
  802. Sessions.updateParticipantLastFetchTime(sessionId, message.sender_id, message.timestamp.getTime());
  803. //更新最后一条消息
  804. Messages.updateLastContent(session_key, sessionType, sessionName, message);
  805. //更新session实体的最后一条消息
  806. SessionRepo.updateSessionLastStatus(message.sender_id,message.sender_name,message.timestamp,message.content,message.content_type,sessionId);
  807. handler(null, messageId);
  808. }).then(function (res) {
  809. // 推送消息
  810. ParticipantRepo.findIds(sessionId, function (err, res) {
  811. if (err) {
  812. handler(err, messageId)
  813. } else {
  814. message.session_id = sessionId;
  815. res.forEach(function (participant) {
  816. if (participant.id !== message.sender_id) {
  817. Sessions.pushNotification(participant.id, message);
  818. }
  819. });
  820. }
  821. })
  822. }).catch(function (err) {
  823. handler(err, messageId)
  824. })
  825. } else {
  826. handler("用户不在此会话当中!", messageId);
  827. }
  828. })
  829. }
  830. /**
  831. * 置顶操作
  832. */
  833. stickSession(sessionId, user) {
  834. let user_session_key = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, user);
  835. let self = this;
  836. //取出最大的session
  837. redis.zrevrangeAsync(user_session_key, 0, 0).then(function (res) {
  838. //获取该session的时间搓
  839. redis.zscoreAsync(user_session_key, res).then(function (scoreres) {
  840. let nowtime = new Date().getTime();
  841. //当前时间搓比redis的时间搓更早证明没有置顶过
  842. if (scoreres <= nowtime) {
  843. //初始化置顶
  844. redis.zaddAsync(user_session_key, STICKY_SESSION_BASE_SCORE, sessionId).then(function (res) {
  845. logger.info("stickSession:" + sessionId + ",res:" + res);
  846. ModelUtil.emitOK(self.eventEmitter, {});
  847. }).then(function () {
  848. SessionRepo.saveStickySession(sessionId, user, STICKY_SESSION_BASE_SCORE);
  849. })
  850. } else {
  851. //已有置顶的数据,取出来加1保存回去
  852. scoreres = Number(scoreres) + 1;
  853. redis.zaddAsync(user_session_key, scoreres, sessionId).then(function () {
  854. logger.info("stickSession:" + sessionId + ",res:" + res);
  855. ModelUtil.emitOK(self.eventEmitter, {});
  856. }).then(function () {
  857. SessionRepo.saveStickySession(sessionId, user, scoreres);
  858. })
  859. }
  860. })
  861. })
  862. }
  863. /**
  864. * 取消会话置顶
  865. */
  866. cancelStickSession(sessionId, user) {
  867. let user_session_key = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, user);
  868. let participants_key = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  869. let self = this;
  870. redis.zscoreAsync(participants_key, user).then(function (res) {
  871. if (!res) {
  872. res = new Date().getTime();
  873. }
  874. redis.zaddAsync(user_session_key, res, sessionId).then(function (res) {
  875. logger.info("cancelStickSession:" + sessionId);
  876. ModelUtil.emitOK(self.eventEmitter, {});
  877. }).then(function () {
  878. SessionRepo.unstickSession(sessionId, user);
  879. });
  880. })
  881. }
  882. /**
  883. * 更新会话参与者的最后消息获取时间。
  884. *
  885. * @param sessionId
  886. * @param userId
  887. */
  888. static updateParticipantLastFetchTime(sessionId, userId, score) {
  889. score = score+1;
  890. let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  891. redis.zaddAsync(participantsKey, score, userId)
  892. .then(function (res) {
  893. ParticipantRepo.updateLastFetchTime(new Date(score), sessionId, userId, function (err, res) {
  894. if (err) {
  895. logger.error("Update participant last fetch time failed: ", err);
  896. }
  897. });
  898. })
  899. .catch(function (err) {
  900. logger.error("Update participant last fetch time failed: ", err);
  901. });
  902. }
  903. /**
  904. * 向用户推送通知,微信端用户直接推送消息,APP端通过个推发送通知消息。
  905. *
  906. * @param targetUserId
  907. * @param message
  908. */
  909. static pushNotification(targetUserId, message) {
  910. Users.isPatientId(targetUserId, function (err, isPatient) {
  911. if (isPatient) {
  912. WechatClient.sendMessage(targetUserId, message);
  913. }
  914. else {
  915. AppClient.sendNotification(targetUserId, message);
  916. }
  917. });
  918. }
  919. }
  920. // Expose class
  921. module.exports = Sessions;