sessions.js 39 KB


  1. /**
  2. * 会话模型。
  3. */
  4. "use strict";
  5. let RedisClient = require('../../repository/redis/redis.client.js');
  6. let RedisModel = require('./../redis.model.js');
  7. let ModelUtil = require('../../util/model.util');
  8. let Messages = require('../messages/messages');
  9. let Users = require('../user/users');
  10. let Participants = require('./Participants');
  11. let SessionRepo = require('../../repository/mysql/session.repo');
  12. let TopicRepo = require('../../repository/mysql/topics.repo');
  13. let ParticipantRepo = require('../../repository/mysql/participant.repo');
  14. let MessageRepo = require('../../repository/mysql/message.repo');
  15. let WechatClient = require("../client/wechat.client.js");
  16. let AppClient = require("../client/app.client.js");
  17. let configFile = require('../../include/commons').CONFIG_FILE;
  18. let config = require('../../resources/config/' + configFile);
  19. let redis = RedisClient.redisClient().connection;
  20. let logger = require('../../util/log.js');
  21. let mongoose = require('mongoose');
  22. let async = require("async");
  23. const REDIS_KEYS = require('../../include/commons').REDIS_KEYS;
  24. const SESSION_TYPES = require('../../include/commons').SESSION_TYPES;
  25. const STICKY_SESSION_BASE_SCORE = require('../../include/commons').STICKY_SESSION_BASE_SCORE;
  26. const SESSION_BUSINESS_TYPE = require('../../include/commons').SESSION_BUSINESS_TYPE;
  27. class Sessions extends RedisModel {
  28. constructor() {
  29. super();
  30. }
  31. /**
  32. * 创建会话。会话的ID来源:
  33. * MUC:患者的ID
  34. * P2P:对成员的ID排序后,取hash值
  35. * GROUP:团队的ID
  36. *
  37. * @param sessionId
  38. * @param name 会话名称
  39. * @param type 会话类型
  40. * @param participantArray 会话成员
  41. * @param handler 回调,仅MUC模式使用
  42. */
  43. createSession(sessionId, name, type, participantArray, handler) {
  44. let self = this;
  45. let messageId = mongoose.Types.ObjectId().toString();
  46. //创建session到mysql
  47. self.createSessionToMysql(sessionId, name, type, participantArray,messageId,function(err,res){
  48. if(err){
  49. logger.error(err);
  50. }else{
  51. logger.info("create session to mysql success :" +JSON.stringify(res));
  52. }
  53. });
  54. //创建session到redis
  55. self.createSessionToRedis(sessionId, name, type, participantArray,messageId, function(err,res){
  56. if(err){
  57. if(handler){handler(err,null);return;};
  58. ModelUtil.emitError(self.eventEmitter, {message:err,status:-1}, null);
  59. }else{
  60. if(handler){handler(null,res);return;};
  61. ModelUtil.emitOK(self.eventEmitter,{status:200,data:res});
  62. }
  63. });
  64. }
  65. /**
  66. * 创建会话。REDIS
  67. * @param sessionId
  68. * @param name
  69. * @param type
  70. * @param participantArray
  71. * @param messageId
  72. * @param handler
  73. * @returns {boolean}
  74. */
  75. createSessionToRedis(sessionId, name, type, participantArray,messageId, handler){
  76. let self = this;
  77. let messages = new Messages();
  78. var participantIdArray = [];
  79. for (let i in participantArray) {
  80. participantIdArray.push(participantArray[i].split(":")[0]);
  81. }
  82. if (type == SESSION_TYPES.P2P||type==SESSION_TYPES.SYSTEM) {
  83. if(sessionId){
  84. callBusinessType(sessionId);
  85. return;
  86. }
  87. if (participantIdArray.length != 2) {
  88. handler("P2P session only allow 2 participants.",null);
  89. return false;
  90. }
  91. ParticipantRepo.findSessionIdByParticipantIds(participantIdArray[0], participantIdArray[1], function (err, res) {
  92. sessionId = res;
  93. callBusinessType(sessionId);
  94. });
  95. } else {
  96. if(!sessionId){
  97. handler("MUC OR GROUP session sessionId is not allow null .",null);
  98. return;
  99. }
  100. callBusinessType(sessionId);
  101. }
  102. function callBusinessType(sessionId) {
  103. ParticipantRepo.getBusinessType(participantIdArray.join("','"),function(err,businessType){
  104. callCreate(sessionId, businessType);
  105. });
  106. }
  107. function callCreate(sessionId, businessType) {
  108. let createDate = new Date();
  109. let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
  110. let message = {
  111. sender_id: "system",
  112. sender_name: "system",
  113. content_type: 11,
  114. content: "会话创建成功",
  115. timestamp: createDate,
  116. id:messageId
  117. };
  118. let session = {
  119. id :sessionId,
  120. name:name,
  121. type:type,
  122. create_date:createDate.getTime(),
  123. business_type:businessType,
  124. last_sender_id : message.sender_id,
  125. last_sender_name : message.sender_name,
  126. last_message_time: message.timestamp.getTime(),
  127. last_content : message.content,
  128. last_content_type : message.content_type
  129. }
  130. redis.hmsetAsync(sessionKey, session).then(function(){
  131. Participants.saveParticipantsToRedis(sessionId, participantArray, createDate, function (res) {
  132. handler(null, session);
  133. //messages.saveMessageToRedisFromCreateSession(sessionId, messageId, message);
  134. });
  135. })
  136. }
  137. }
  138. /**
  139. * 创建会话。mysql
  140. * @param sessionId
  141. * @param name
  142. * @param type
  143. * @param participantArray
  144. * @param messageId
  145. * @param handler
  146. */
  147. createSessionToMysql(sessionId, name, type, participantArray,messageId, handler){
  148. let self = this;
  149. //如果sessionId不存在则执行创建sessionId过程
  150. var participantIdArray = [];
  151. for (let i in participantArray) {
  152. participantIdArray.push(participantArray[i].split(":")[0]);
  153. }
  154. //流程1-判断是否存在sessionId不存在则创建对应的sessionId;
  155. if(!sessionId){
  156. if (type == SESSION_TYPES.P2P||type==SESSION_TYPES.SYSTEM) {
  157. ParticipantRepo.findSessionIdByParticipantIds(participantIdArray[0], participantIdArray[1], function (err, res) {
  158. sessionId = res;
  159. callBusinessType();
  160. });
  161. }else{
  162. handler("MUC模式和团队模式,不允许sessionId为空!",null);
  163. return;
  164. }
  165. }else{
  166. callBusinessType();
  167. }
  168. //流程2-判断session的业务类型;
  169. function callBusinessType(){
  170. ParticipantRepo.getBusinessType(participantIdArray.join("','"),function(err,businessType){
  171. if(err){handler(err,null);return;}
  172. callCreateSession(businessType);
  173. });
  174. }
  175. //流程3-发起session创建 返回session实例
  176. function callCreateSession(businessType){
  177. //查找该sessionId是否存在存在则直接返回实例
  178. SessionRepo.findOne(sessionId, function (err, res) {
  179. if (res.length > 0) {//已经存在
  180. handler(null,res[0]);
  181. }else{
  182. let createDate = new Date();
  183. let session ={
  184. id :sessionId,
  185. name:name,
  186. type:type,
  187. create_date:createDate.getTime(),
  188. business_type:businessType
  189. }
  190. //将session写入数据库
  191. self.saveSessionToMysql(sessionId, name, type, createDate, businessType, function (err, res) {
  192. if(err){handler(err,null);return;};
  193. callCreateParticipants(session);
  194. })
  195. }
  196. });
  197. }
  198. //流程4-发起session成员创建
  199. function callCreateParticipants(session){
  200. Participants.saveParticipantsToMysql(sessionId, participantArray, function (err, res) {
  201. if(err){handler(err,null);return;};
  202. callBeginTrans(session);
  203. })
  204. }
  205. //流程5-发起session会话
  206. function callBeginTrans(session) {
  207. let mesDate = new Date();
  208. let message = {
  209. sender_id: "system",
  210. sender_name: "system",
  211. content_type: 6,
  212. content: "会话创建成功",
  213. timestamp: mesDate,
  214. id:messageId
  215. };
  216. session.last_sender_id = message.sender_id;
  217. session.last_sender_name = message.sender_name;
  218. session.last_message_time = mesDate.getTime();
  219. session.last_content = message.content;
  220. session.last_content_type = message.content_type;
  221. SessionRepo.updateSessionLastStatus(message.sender_id,
  222. message.sender_name,
  223. message.timestamp,
  224. message.content,
  225. message.content_type,
  226. sessionId, function (err, res) {
  227. if (err) {handler(err,null);return;};
  228. handler(null,session);
  229. });
  230. }
  231. }
  232. /**
  233. * 保存session到MySQL
  234. * @param sessionId
  235. * @param name
  236. * @param type
  237. * @param createDate
  238. * @param handler
  239. */
  240. saveSessionToMysql(sessionId, name, type, createDate, businessType, handler) {
  241. SessionRepo.saveSession(sessionId, name, type, createDate, businessType, handler);
  242. }
  243. /**
  244. * 获取某个用户的全部session列表
  245. * @param userId
  246. * @param handler
  247. */
  248. getUserSessionsFromMysql(userId, handler) {
  249. SessionRepo.findAll(userId, handler);
  250. }
  251. /**
  252. * 获取session单个对象
  253. * @param sessionId
  254. * @param handler
  255. */
  256. getSessions(sessionId, handler) {
  257. SessionRepo.findOne(sessionId, handler);
  258. }
  259. /**
  260. * 根据用户ID获取用户的session列表
  261. * @param userId
  262. * @param page
  263. * @param size
  264. */
  265. getUserSessions(userId, page, size, businessType) {
  266. let userSessionKey = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, userId);
  267. let self = this;
  268. if (page > 0) {
  269. if (page == 1) {
  270. page = 0;
  271. }
  272. page = page + page * size;
  273. }
  274. async.waterfall([
  275. // 获取会话ID列表
  276. function (callback) {
  277. redis.zrevrangeAsync(userSessionKey, page, size)
  278. .then(function (sessionIds) {
  279. if (sessionIds.length == 0) {
  280. ModelUtil.emitOK(self.eventEmitter, []);
  281. return;
  282. }
  283. callback(null, sessionIds);
  284. })
  285. },
  286. // 遍历会话
  287. function (sessionIds, callback) {
  288. let sessionList = [];
  289. sessionIds.forEach(function (sessionId) {
  290. let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
  291. let participantsRoleKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipantsRole, sessionId);
  292. let sessionParticipantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  293. let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants,sessionId);
  294. redis.multi()
  295. .hgetall(sessionKey) // 会话实体
  296. .hget(participantsRoleKey, userId) // 用户在此会话中的角色
  297. .zscore(sessionParticipantsKey, userId) // 用户在此会话中最后一次获取未读消息的时间
  298. .zrange(participantsKey,0,-1)
  299. .execAsync()
  300. .then(function (res) {
  301. let session = res[0];
  302. let role = res[1];
  303. let lastFetchTime = res[2];
  304. let users = res[3];
  305. let sessionName="";
  306. let otheruserId ="";
  307. if(session.type==SESSION_TYPES.P2P){
  308. for(var j in users){
  309. if(users[j]!=userId){
  310. otheruserId = users[j];
  311. }
  312. }
  313. }
  314. if(!role)role =0;
  315. if(!lastFetchTime)lastFetchTime=new Date().getTime();
  316. // 计算未读消息数
  317. let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  318. redis.zcountAsync(messagesByTimestampKey, lastFetchTime, new Date().getTime())
  319. .then(function (count) {
  320. if(!otheruserId)otheruserId=userId;
  321. ParticipantRepo.findNameById(otheruserId, function (err, res) {
  322. if((res&&res.length==0)||session.type!=SESSION_TYPES.P2P){
  323. sessionName = session.name;
  324. }else{
  325. sessionName = res[0].name;
  326. }
  327. sessionList.push({
  328. id: sessionId,
  329. name: sessionName,
  330. create_date: session.create_date,
  331. last_content_type: session.last_content_type,
  332. last_content: session.last_content,
  333. sender_id: session.sender_id,
  334. type: session.type,
  335. sender_name: session.sender_name,
  336. unread_count: count,
  337. business_type: session.business_type,
  338. my_role: role
  339. });
  340. if (sessionId === sessionIds[sessionIds.length - 1]) {
  341. ModelUtil.emitOK(self.eventEmitter, sessionList);
  342. }
  343. })
  344. })
  345. }).catch(function (err) {
  346. logger.error("Get sessions failed: ", ex);
  347. ModelUtil.emitError(self.eventEmitter, "Get sessions failed: " + err);
  348. });
  349. });
  350. }
  351. ]);
  352. }
  353. /**
  354. * 获取会话消息。全部,不管已读/未读状态。
  355. *
  356. * @param sessionId 会话ID
  357. * @param userId 拉取消息的人
  358. * @param page 第几页
  359. * @param pagesize 分页数量
  360. * @param start_msg_id 消息会话最新的一条消息的ID
  361. * @param end_msg_id 消息会话刚开始的消息ID
  362. */
  363. getMessages(sessionId, user, start_msg_id, end_msg_id, page, pagesize, isoffset,handler) {
  364. let self = this;
  365. let message_timestamp_key = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  366. if (!start_msg_id && !end_msg_id) {
  367. redis.zrevrangeAsync(message_timestamp_key, 0, 0).then(function (res) {
  368. if (res.length == 0) {
  369. if(handler){
  370. handler(null,res);
  371. return;
  372. }
  373. ModelUtil.emitOK(self.eventEmitter, res);
  374. return;
  375. }
  376. start_msg_id = res[0];
  377. redis.zrangeAsync(message_timestamp_key, 0, 0).then(function (res) {
  378. if (res.length == 0) {
  379. if(handler){
  380. handler(null,res);
  381. return;
  382. }
  383. ModelUtil.emitOK(self.eventEmitter, res);
  384. return;
  385. }
  386. end_msg_id = res[0];
  387. self.getMessagesByPage(sessionId, user, end_msg_id, start_msg_id, page, pagesize, isoffset, function (err, res) {
  388. if (err) {
  389. if(handler){
  390. handler(err,null);
  391. return;
  392. }
  393. logger.error("getMessagesByPage error" + err);
  394. ModelUtil.emitError(self.eventEmitter, err, err);
  395. } else {
  396. if(handler){
  397. handler(null,res);
  398. return;
  399. }
  400. ModelUtil.emitOK(self.eventEmitter, res);
  401. }
  402. })
  403. })
  404. })
  405. } else if (!start_msg_id) {
  406. redis.zrevrangeAsync(message_timestamp_key, 0, 0).then(function (res) {
  407. if (res.length == 0) {
  408. if(handler){
  409. handler(null,res);
  410. return;
  411. }
  412. ModelUtil.emitOK(self.eventEmitter, res);
  413. return;
  414. }
  415. start_msg_id = res[0];
  416. self.getMessagesByPage(sessionId, user, end_msg_id, start_msg_id, page, pagesize, isoffset, function (err, res) {
  417. if (err) {
  418. if(handler){
  419. handler(err,null);
  420. return;
  421. }
  422. logger.error("getMessagesByPage error" + err);
  423. ModelUtil.emitError(self.eventEmitter, err, err);
  424. } else {
  425. if(handler){
  426. handler(null,res);
  427. return;
  428. }
  429. ModelUtil.emitOK(self.eventEmitter, res);
  430. }
  431. })
  432. })
  433. } else if (!end_msg_id) {
  434. redis.zrangeAsync(message_timestamp_key, 0, 0).then(function (res) {
  435. if (res.length == 0) {
  436. ModelUtil.emitOK(self.eventEmitter, res);
  437. return;
  438. }
  439. end_msg_id = res[0];
  440. self.getMessagesByPage(sessionId, user, start_msg_id, end_msg_id, page, pagesize, isoffset, function (err, res) {
  441. if (err) {
  442. if(handler){
  443. handler(err,null);
  444. return;
  445. }
  446. logger.error("getMessagesByPage error" + err);
  447. ModelUtil.emitError(self.eventEmitter, err, err);
  448. } else {
  449. if(handler){
  450. handler(null,res);
  451. return;
  452. }
  453. ModelUtil.emitOK(self.eventEmitter, res);
  454. }
  455. })
  456. })
  457. } else {
  458. self.getMessagesByPage(sessionId, user, end_msg_id, start_msg_id, page, pagesize, isoffset, function (err, res) {
  459. if (err) {
  460. if(handler){
  461. handler(err,null);
  462. return;
  463. }
  464. logger.error("getMessagesByPage error" + err);
  465. ModelUtil.emitError(self.eventEmitter, err, err);
  466. } else {
  467. if(handler){
  468. handler(null,res);
  469. return;
  470. }
  471. ModelUtil.emitOK(self.eventEmitter, res);
  472. }
  473. })
  474. }
  475. }
  476. /**
  477. * 分页获取会话消息。
  478. *
  479. * @param sessionId 必选。会话ID
  480. * @param userId 必选。用户ID
  481. * @param startMsgId 必选。会话的的起始消息ID,作为检索的起始依据
  482. * @param endMsgId 必选。会话中的结束消息ID
  483. * @param page 必选。页码
  484. * @param size 必选。页面大小
  485. * @param handler 必选。回调
  486. */
  487. getMessagesByPage(sessionId, userId, startMsgId, endMsgId, page, size, isoffset, handler) {
  488. let messagesTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  489. let messagesKey = RedisModel.makeRedisKey(REDIS_KEYS.Messages, sessionId);
  490. let participants = new Participants();
  491. let offset = (page - 1 < 0 ? 0 : page - 1) * size;
  492. let count = size;
  493. if (page > 1 || isoffset == 1) {
  494. offset += 1; // 翻页由于闭区间,需跳过本身数据
  495. }
  496. participants.existsParticipant(sessionId, userId, function (err, res) {
  497. if (!res) {
  498. handler(Error("User not found in session " + sessionId), null);
  499. } else {
  500. //将消息ID转换成分值
  501. redis.multi()
  502. .zscore(messagesTimestampKey, startMsgId)
  503. .zscore(messagesTimestampKey, endMsgId)
  504. .execAsync()
  505. .then(function (res) {
  506. let startMsgScore = res[1];
  507. let endMsgScore = res[0];
  508. if (startMsgScore == null || endMsgScore == null || startMsgScore == endMsgScore) {
  509. handler(null, []);
  510. return;
  511. }
  512. // 从消息时间表中过滤出要获取的消息ID列表,倒序取出消息
  513. redis.zrevrangebyscoreAsync(messagesTimestampKey, startMsgScore, endMsgScore, "limit", offset, count)
  514. .then(function (res) {
  515. if (res.length == 0) {
  516. handler(null, []);
  517. return;
  518. }
  519. redis.hmgetAsync(messagesKey, res).then(function (messages) {
  520. handler(null, messages);
  521. }).then(function () {
  522. Sessions.updateParticipantLastFetchTime(sessionId, userId, new Date().getTime());
  523. })
  524. })
  525. .catch(function (err) {
  526. logger.error("Get message by page failed: ", err);
  527. handler(err, false);
  528. })
  529. })
  530. }
  531. })
  532. }
  533. /**
  534. * 获取所有会话的未读消息数。
  535. */
  536. getAllSessionsUnreadMessageCount(userId) {
  537. let self = this;
  538. let count = 0;
  539. SessionRepo.findAll(userId,function(err,res){
  540. if(err){
  541. logger.err("getAllSessionsUnreadMessageCount is fail :"+err);
  542. return;
  543. }
  544. if(res.length==0){
  545. ModelUtil.emitOK(self.eventEmitter, {count: count});
  546. return;
  547. }
  548. for(var j in res){
  549. if(res[j].type==SESSION_TYPES.SYSTEM){
  550. if(j==res.length-1){
  551. ModelUtil.emitOK(self.eventEmitter, {count: count});
  552. }
  553. continue;
  554. }
  555. callback(res,j,res[j]);
  556. }
  557. })
  558. function callback(res,j,session){
  559. self.getSessionUnreadMessageCount(res[j].id,userId,function(err,con){
  560. if(err){
  561. logger.err("getAllSessionsUnreadMessageCount is fail :"+err);
  562. }
  563. count = count+con;
  564. if(j==res.length-1){
  565. ModelUtil.emitOK(self.eventEmitter, {count: count});
  566. }
  567. })
  568. }
  569. }
  570. /**
  571. * 获取会话的未读消息数。根据成员最后一次获取消息的时候与当前时间。
  572. *
  573. * @param sessionId
  574. * @param userId
  575. */
  576. getSessionUnreadMessageCount(sessionId, userId,handler) {
  577. let self = this;
  578. let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  579. let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  580. async.waterfall([
  581. // 此成员最后获取消息的时间
  582. function (callback) {
  583. redis.zscoreAsync(participantsKey, userId)
  584. .then(function (lastFetchTime) {
  585. callback(null, lastFetchTime);
  586. })
  587. },
  588. // 计算最后获取消息的时间之后到现在有多少条消息
  589. function (lastFetchTime, callback) {
  590. if (!lastFetchTime) lastFetchTime = 0;
  591. let now = new Date().getTime();
  592. redis.zcountAsync(messagesByTimestampKey, lastFetchTime, now)
  593. .then(function (count) {
  594. if(handler){
  595. handler(null,count);
  596. }else{
  597. ModelUtil.emitOK(self.eventEmitter, {count: count});
  598. }
  599. })
  600. }
  601. ], function (err, res) {
  602. if (err) {
  603. if(handler){
  604. handler(err,0);
  605. }else{
  606. ModelUtil.emitError(self.eventEmitter, "Get session unread message count failed.")
  607. }
  608. }
  609. });
  610. }
  611. /**
  612. * 获取会话未读消息数。根据成员最后一次获取消息的时候与当前时间。
  613. */
  614. getSessionUnreadMessages(sessionId, userId) {
  615. let self = this;
  616. let messagesByTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  617. let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  618. async.waterfall([
  619. // 此成员最后获取消息的时间
  620. function (callback) {
  621. redis.zscoreAsync(participantsKey, userId)
  622. .then(function (lastFetchTime) {
  623. callback(null, lastFetchTime);
  624. })
  625. },
  626. // 最后获取消息的时间之后到现在的消息ID列表
  627. function (lastFetchTime, callback) {
  628. if (!lastFetchTime) lastFetchTime = 0;
  629. let now = new Date().getTime();
  630. redis.zrangebyscoreAsync(messagesByTimestampKey, lastFetchTime, now)
  631. .then(function (messageIds) {
  632. callback(null, messageIds);
  633. })
  634. },
  635. // 获取消息
  636. function (messageIds, callback) {
  637. if (messageIds.length == 0) {
  638. ModelUtil.emitOK(self.eventEmitter, []);
  639. return;
  640. }
  641. let startMsgId = messageIds[0];
  642. let endMsgId = messageIds[messageIds.length - 1];
  643. self.getMessagesByPage(sessionId, userId, startMsgId, endMsgId, 0, messageIds.length, 0, function (err, res) {
  644. if (err) {
  645. ModelUtil.emitError(self.eventEmitter, err.message);
  646. return;
  647. }
  648. ModelUtil.emitOK(self.eventEmitter, res);
  649. });
  650. }
  651. ], function (err, res) {
  652. if (err) {
  653. ModelUtil.emitError(self.eventEmitter, "Get session unread message count failed.")
  654. }
  655. });
  656. }
  657. /**
  658. * 保存消息。
  659. *
  660. * 也可以根据议题保存消息,但最终还是保存到与会话对象。
  661. *
  662. * see also: saveMessageByTopic
  663. *
  664. * @param message
  665. * @param sessionId
  666. */
  667. saveMessageBySession(sessionId, message) {
  668. let self = this;
  669. let messages = new Messages();
  670. let participants = new Participants();
  671. let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
  672. let messageId = mongoose.Types.ObjectId().toString();
  673. message.id = messageId;
  674. // 检查会话中是否存在此成员
  675. participants.existsParticipant(sessionId, message.sender_id, function (err, res) {
  676. if (err) {
  677. ModelUtil.emitError(self.eventEmitter, "Check session paticipant failed: ", err);
  678. return;
  679. }
  680. if (res) {
  681. redis.hmgetAsync(sessionKey, ["type", "name"]).then(function (res) {
  682. let sessionType = res[0];
  683. let sessionName = res[1];
  684. if (sessionType == null) {
  685. ModelUtil.emitError(self.eventEmitter, "Session with id " + sessionId + " not found.");
  686. return;
  687. }
  688. //消息保存到REDIS
  689. messages.saveMessageToRedis(sessionId, sessionType, messageId, message);
  690. //更新REDIS最后一条消息
  691. Messages.updateLastContent(sessionKey, sessionType, sessionName, message);
  692. //更新用户最后一次获取消息的时间
  693. Sessions.updateParticipantLastFetchTime(sessionId, message.sender_id, message.timestamp.getTime());
  694. //更新最后一条消息到数据库
  695. SessionRepo.updateSessionLastStatus(message.sender_id,message.sender_name,message.timestamp,message.content,message.content_type,sessionId);
  696. //将消息保存到数据库
  697. messages.saveMessageToMysql(sessionId, sessionType, messageId, message, function (err, res) {
  698. if (err) {
  699. ModelUtil.emitError(self.eventEmitter, {message: "Failed to save message to mysql: " + err});
  700. } else {
  701. message.timestamp = message.timestamp.getTime();
  702. ModelUtil.emitOK(self.eventEmitter, {count: 1, messages: [message]});
  703. }
  704. });
  705. }).then(function (res) {
  706. // 推送消息
  707. ParticipantRepo.findIds(sessionId, function (err, res) {
  708. if (err) {
  709. ModelUtil.logError("Push message from session: get participant's id list failed: ", err);
  710. } else {
  711. message.session_id = sessionId;
  712. res.forEach(function (participant) {
  713. if (participant.id !== message.sender_id) {
  714. Sessions.pushNotification(participant.id, message);
  715. }
  716. });
  717. }
  718. })
  719. }).catch(function (err) {
  720. ModelUtil.emitError(self.eventEmitter, {message: "Error occurred while save message to session: " + err});
  721. })
  722. } else {
  723. ModelUtil.emitDataNotFound(self.eventEmitter, {message: "当前会话找不到此发送者"});
  724. }
  725. });
  726. }
  727. sendTopicMessages(topicId,message){
  728. let self = this;
  729. TopicRepo.findAllByTopicId(topicId,function(err,res){
  730. if(err||res.length==0){
  731. ModelUtil.emitOK(self.eventEmitter, {status:-1,"message": "议题获取失败"});
  732. return;
  733. }
  734. self.saveMessageByTopic(message,res[0].session_id,function(err,messageId){
  735. if(err){
  736. ModelUtil.emitOK(self.eventEmitter, {status:-1,"message":err});
  737. }else{
  738. message.id = messageId;
  739. ModelUtil.emitOK(self.eventEmitter, {status:200,"message":"发送成功",data:message});
  740. }
  741. });
  742. });
  743. }
  744. /**
  745. * 保存消息
  746. *
  747. * @param message
  748. * @param sessionId
  749. * @param handler
  750. */
  751. saveMessageByTopic(message, sessionId, handler) {
  752. let messages = new Messages();
  753. let participants = new Participants();
  754. let session_key = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
  755. let messageId = mongoose.Types.ObjectId().toString();
  756. let self = this;
  757. let sessionType = 0;
  758. let sessionName = "";
  759. message.id = messageId;
  760. // 发送成员必须处于会话中
  761. participants.existsParticipant(sessionId, message.sender_id, function (err, res) {
  762. if (res) {
  763. redis.hmgetAsync(session_key, ["type", "name"]).then(function (res) {
  764. sessionType = res[0];
  765. sessionName = res[1];
  766. if (!sessionType || !sessionName) {
  767. logger.error("session is error for key " + session_key);
  768. throw "session is not found";
  769. }
  770. }).then(function (res) {
  771. // 更新消息存储REDIS
  772. messages.saveMessageToRedis(sessionId, sessionType, messageId, message);
  773. //更新消息存储到mysql
  774. messages.saveMessageToMysql(sessionId, sessionType, messageId, message);
  775. // 更新会话最新状态及成员最后一次消息获取时间
  776. Sessions.updateParticipantLastFetchTime(sessionId, message.sender_id, message.timestamp.getTime());
  777. //更新最后一条消息
  778. Messages.updateLastContent(session_key, sessionType, sessionName, message);
  779. //更新session实体的最后一条消息
  780. SessionRepo.updateSessionLastStatus(message.sender_id,message.sender_name,message.timestamp,message.content,message.content_type,sessionId);
  781. handler(null, messageId);
  782. }).then(function (res) {
  783. // 推送消息
  784. ParticipantRepo.findIds(sessionId, function (err, res) {
  785. if (err) {
  786. handler(err, messageId)
  787. } else {
  788. message.session_id = sessionId;
  789. res.forEach(function (participant) {
  790. if (participant.id !== message.sender_id) {
  791. Sessions.pushNotification(participant.id, message);
  792. }
  793. });
  794. }
  795. })
  796. }).catch(function (err) {
  797. handler(err, messageId)
  798. })
  799. } else {
  800. handler("用户不在此会话当中!", messageId);
  801. }
  802. })
  803. }
  804. /**
  805. * 置顶操作
  806. */
  807. stickSession(sessionId, user) {
  808. let user_session_key = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, user);
  809. let self = this;
  810. //取出最大的session
  811. redis.zrevrangeAsync(user_session_key, 0, 0).then(function (res) {
  812. //获取该session的时间搓
  813. redis.zscoreAsync(user_session_key, res).then(function (scoreres) {
  814. let nowtime = new Date().getTime();
  815. //当前时间搓比redis的时间搓更早证明没有置顶过
  816. if (scoreres <= nowtime) {
  817. //初始化置顶
  818. redis.zaddAsync(user_session_key, STICKY_SESSION_BASE_SCORE, sessionId).then(function (res) {
  819. logger.info("stickSession:" + sessionId + ",res:" + res);
  820. ModelUtil.emitOK(self.eventEmitter, {});
  821. }).then(function () {
  822. SessionRepo.saveStickySession(sessionId, user, STICKY_SESSION_BASE_SCORE);
  823. })
  824. } else {
  825. //已有置顶的数据,取出来加1保存回去
  826. scoreres = Number(scoreres) + 1;
  827. redis.zaddAsync(user_session_key, scoreres, sessionId).then(function () {
  828. logger.info("stickSession:" + sessionId + ",res:" + res);
  829. ModelUtil.emitOK(self.eventEmitter, {});
  830. }).then(function () {
  831. SessionRepo.saveStickySession(sessionId, user, scoreres);
  832. })
  833. }
  834. })
  835. })
  836. }
  837. /**
  838. * 取消会话置顶
  839. */
  840. cancelStickSession(sessionId, user) {
  841. let user_session_key = RedisModel.makeRedisKey(REDIS_KEYS.UserSessions, user);
  842. let participants_key = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  843. let self = this;
  844. redis.zscoreAsync(participants_key, user).then(function (res) {
  845. if (!res) {
  846. res = new Date().getTime();
  847. }
  848. redis.zaddAsync(user_session_key, res, sessionId).then(function (res) {
  849. logger.info("cancelStickSession:" + sessionId);
  850. ModelUtil.emitOK(self.eventEmitter, {});
  851. }).then(function () {
  852. SessionRepo.unstickSession(sessionId, user);
  853. });
  854. })
  855. }
  856. /**
  857. * 更新会话参与者的最后消息获取时间。
  858. *
  859. * @param sessionId
  860. * @param userId
  861. */
  862. static updateParticipantLastFetchTime(sessionId, userId, score) {
  863. score = score+1;
  864. let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  865. redis.zaddAsync(participantsKey, score, userId)
  866. .then(function (res) {
  867. ParticipantRepo.updateLastFetchTime(new Date(score), sessionId, userId, function (err, res) {
  868. if (err) {
  869. logger.error("Update participant last fetch time failed: ", err);
  870. }
  871. });
  872. })
  873. .catch(function (err) {
  874. logger.error("Update participant last fetch time failed: ", err);
  875. });
  876. }
  877. /**
  878. * 向用户推送通知,微信端用户直接推送消息,APP端通过个推发送通知消息。
  879. *
  880. * @param targetUserId
  881. * @param message
  882. */
  883. static pushNotification(targetUserId, message) {
  884. Users.isPatientId(targetUserId, function (err, isPatient) {
  885. if (isPatient) {
  886. WechatClient.sendMessage(targetUserId, message);
  887. }
  888. else {
  889. AppClient.sendNotification(targetUserId, message);
  890. }
  891. });
  892. }
  893. }
  894. // Expose class
  895. module.exports = Sessions;