pubSub.js 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. /*
  2. * redis发布订阅
  3. *example:
  4. * let channel="ryan";
  5. redis.pubSub.registerHandlers("ryan",msg=> console.log(msg));
  6. redis.pubSub.subscribe(channel);
  7. redis.pubSub.publish(channel,"hello from chen");
  8. */
  9. "use strict";
  10. let configFile = require('../../include/commons').CONFIG_FILE;
  11. let config = require('../../resources/config/' + configFile);
  12. let RedisModel = require('./../redis.model');
  13. let RedisSubClient = require('./redisSubClient');
  14. let RedisPubClient = require('./redisPubClient');
  15. let RedisClient = require('../../repository/redis/redis.client.js');
  16. let redisPubConn = RedisPubClient.redisClient().connection;
  17. let redisSubConn = RedisSubClient.redisClient().connection;
  18. let redis = RedisClient.redisClient().connection;
  19. let Sessions = require('../../models/sessions/sessions');
  20. let WechatClient = require("../client/wechat.client.js");
  21. let WlyySDK = require("../../util/wlyy.sdk");
  22. let AppClient = require("../client/app.client.js");
  23. let Participants = require('../sessions/participants');
  24. let Messages = require('../messages/messages');
  25. const REDIS_KEYS = require('../../include/commons').REDIS_KEYS;
  26. let ObjectUtil = require("../../util/object.util.js");
  27. let logger = require('../../util/log.js');
  28. class PubSub{
  29. constructor(){
  30. this.sub=redisSubConn;
  31. this.handlers=new Map();
  32. this.subAction=(channle,message)=>{
  33. let actions= this.handlers.get(channle)||new Set();
  34. for(let action of actions)
  35. {
  36. console.log("接收消息:"+message);
  37. message = JSON.parse(message);
  38. message.timestamp = new Date(message.timestamp);
  39. let sessionId = message.session_id;
  40. let participants = new Participants();
  41. let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
  42. let sessionType =0;
  43. // 检查会话中是否存在此成员
  44. participants.existsParticipant(sessionId, message.sender_id, function (err, res) {
  45. if (res) {
  46. redis.hmgetAsync(sessionKey, ["type", "name"]).then(function (res) {
  47. sessionType = res[0];
  48. let sessionName = res[1];
  49. if (sessionType) {
  50. // 消息保存到Redis,并更新会话最后状态、用户最后消息获取时间
  51. let sessionKey = RedisModel.makeRedisKey(REDIS_KEYS.Session, sessionId);
  52. let messageKey = RedisModel.makeRedisKey(REDIS_KEYS.Messages, sessionId);
  53. let messageTimestampKey = RedisModel.makeRedisKey(REDIS_KEYS.MessagesByTimestamp, sessionId);
  54. let msgJson = {
  55. id: message.id,
  56. sender_id: message.sender_id,
  57. sender_name: message.sender_name,
  58. timestamp: message.timestamp.getTime(),
  59. content_type: message.content_type,
  60. content: message.content,
  61. business_type:message.business_type||1
  62. };
  63. redis.multi()
  64. .hset(messageKey, message.id, JSON.stringify(msgJson)) // 保存消息
  65. .zadd(messageTimestampKey, message.timestamp.getTime(), message.id) // 保存消息时间
  66. .execAsync()
  67. .then(function (res) {
  68. Messages.updateLastContent(sessionKey, sessionType, null, message);
  69. Messages.cleanOutRangeMessage(sessionId); // clean out range messages
  70. })
  71. .catch(function (ex) {
  72. logger.error("Save message to redis failed: ", ex);
  73. });
  74. Messages.updateLastContent(sessionKey, sessionType, sessionName, message);
  75. var score = message.timestamp.getTime() + 1;
  76. let participantsKey = RedisModel.makeRedisKey(REDIS_KEYS.SessionParticipants, sessionId);
  77. redis.zaddAsync(participantsKey, score, message.sender_id)
  78. .then(function (res) {
  79. //这个代码需要消息存入缓存后才能执行,否则用户拉取消息列表时可能缺少消息
  80. if(config.pubSubSwitch){//接收订阅消息处理开关,本地运行和测试库单独运行时防止用户接收消息2次
  81. //Sessions.getRedisPushNotification(message);这里不知为什么无法调用这个方法,提示getRedisPushNotification不是方法
  82. if (message.targetType=='patient') {
  83. if(config.environment!='local'){//pc版接收要发给居民的消息不做处理
  84. WechatClient.sendMessage(message.targetUserId, message.targetUserName, message);
  85. }
  86. } else {
  87. if(message.sessionType=="1"){
  88. WechatClient.sendReadDoctorByDoctorId(message.targetUserId, message);
  89. }
  90. //告知医生新消息
  91. WechatClient.sendSocketMessageToDoctor(message.targetUserId,message);
  92. if(config.environment!='local'){//pc版不推送个推
  93. WlyySDK.request(message.targetUserId, '', '', '', '/im/common/message/messages', 'POST', function (err, res) {
  94. let count = 0;
  95. res = JSON.parse(res)
  96. if (res.status == 200) {
  97. let data = res.data;
  98. count = parseInt(JSON.parse(data.imMsgCount).count) + parseInt(data.system.amount) + parseInt(data.healthIndex.amount) + parseInt(data.sign.amount);
  99. }
  100. AppClient.sendNotification(message.targetUserId, message,message.sessionType,count);
  101. });
  102. }
  103. //外网pcim通过socket推送
  104. WechatClient.sendPcImSocket(message.targetUserId,message,message.sessionType);
  105. }
  106. }
  107. })
  108. .catch(function (err) {
  109. logger.error("Update participant last fetch time failed: ", err);
  110. });
  111. }
  112. }).catch(function (err) {
  113. logger.error({message: "Error occurred while save message to session: " + err});
  114. })
  115. } else {
  116. logger.error({message: "当前会话找不到此发送者"});
  117. }
  118. });
  119. //action(message);
  120. }
  121. }
  122. this.alredyPublishs=[];
  123. this.subConnected=false;
  124. }
  125. publish(channel,message)
  126. {
  127. let action=()=>{
  128. let pub=redisPubConn;
  129. pub.publish(channel,message);
  130. console.log("发布消息:channel:"+channel+",message:"+message);
  131. };
  132. if(this.subConnected===false)
  133. {
  134. this.alredyPublishs.push(action);
  135. }
  136. else{
  137. action();
  138. }
  139. }
  140. registerHandlers(channel,action)
  141. {
  142. var actions=this.handlers.get(channel)||new Set();
  143. actions.add(action);
  144. this.handlers.set(channel,actions);
  145. }
  146. subscribe(channel)
  147. {
  148. let self=this;
  149. this.sub.subscribe(channel,function (err,reply) {
  150. if(err){
  151. log.error(err);
  152. }
  153. self.subConnected=true;
  154. for(let publish of self.alredyPublishs){
  155. publish();
  156. }
  157. console.log("订阅成功:"+reply);
  158. });
  159. this.sub.on("message", function (channel, message) {
  160. self.subAction(channel,message);
  161. });
  162. }
  163. tearDown()
  164. {
  165. this.sub.quit();
  166. }
  167. }
  168. // Expose class
  169. module.exports = new PubSub();