pubSub.js 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. /*
  2. * redis发布订阅
  3. *example:
  4. * let channel="ryan";
  5. redis.pubSub.registerHandlers("ryan",msg=> console.log(msg));
  6. redis.pubSub.subscribe(channel);
  7. redis.pubSub.publish(channel,"hello from chen");
  8. */
  9. "use strict";
  10. let configFile = require('../../include/commons').CONFIG_FILE;
  11. let config = require('../../resources/config/' + configFile);
  12. let RedisModel = require('./../redis.model');
  13. let RedisSubClient = require('./redisSubClient');
  14. let RedisPubClient = require('./redisPubClient');
  15. let RedisClient = require('../../repository/redis/redis.client.js');
  16. let redisPubConn = RedisPubClient.redisClient().connection;
  17. let redisSubConn = RedisSubClient.redisClient().connection;
  18. let redis = RedisClient.redisClient().connection;
  19. let Sessions = require('../../models/sessions/sessions');
  20. let WechatClient = require("../client/wechat.client.js");
  21. let WlyySDK = require("../../util/wlyy.sdk");
  22. let AppClient = require("../client/app.client.js");
  23. let Participants = require('../sessions/participants');
  24. let Messages = require('../messages/messages');
  25. const REDIS_KEYS = require('../../include/commons').REDIS_KEYS;
  26. let ObjectUtil = require("../../util/object.util.js");
  27. let logger = require('../../util/log.js');
  28. let SessionRepo = require('../../repository/mysql/session.repo');
  29. let ParticipantRepo = require('../../repository/mysql/participant.repo');
  30. const SESSION_TYPES = require('../../include/commons').SESSION_TYPES;
  31. const SESSION_BUSINESS_TYPE = require('../../include/commons').SESSION_BUSINESS_TYPE;
  32. class PubSub{
  33. constructor(){
  34. this.sub=redisSubConn;
  35. this.handlers=new Map();
  36. this.subAction=(channle,message)=>{
  37. let actions= this.handlers.get(channle)||new Set();
  38. for(let action of actions)
  39. {
  40. console.log("接收消息:"+message);
  41. message = JSON.parse(message);
  42. if(config.pubSubSwitch){//接收订阅消息处理开关,本地运行和测试库单独运行时防止用户接收消息2次
  43. //Sessions.getRedisPushNotification(message);这里不知为什么无法调用这个方法,提示getRedisPushNotification不是方法
  44. if (message.targetType=='patient') {
  45. if(config.environment!='local'){//pc版接收要发给居民的消息不做处理
  46. WechatClient.sendMessage(message.targetUserId, message.targetUserName, message);
  47. }
  48. } else {
  49. if(message.sessionType=="1"){
  50. WechatClient.sendReadDoctorByDoctorId(message.targetUserId, message);
  51. }
  52. //告知医生新消息
  53. WechatClient.sendSocketMessageToDoctor(message.targetUserId,message);
  54. if(config.environment!='local'){//pc版不推送个推
  55. WlyySDK.request(message.targetUserId, '', '', '', '/im/common/message/messages', 'POST', function (err, res) {
  56. let count = 0;
  57. res = JSON.parse(res)
  58. if (res.status == 200) {
  59. let data = res.data;
  60. count = parseInt(JSON.parse(data.imMsgCount).count) + parseInt(data.system.amount) + parseInt(data.healthIndex.amount) + parseInt(data.sign.amount);
  61. }
  62. AppClient.sendNotification(message.targetUserId, message,message.sessionType,count);
  63. });
  64. }
  65. //外网pcim通过socket推送
  66. WechatClient.sendPcImSocket(message.targetUserId,message,message.sessionType);
  67. }
  68. }
  69. //action(message);
  70. }
  71. }
  72. this.alredyPublishs=[];
  73. this.subConnected=false;
  74. }
  75. publish(channel,message)
  76. {
  77. let action=()=>{
  78. let pub=redisPubConn;
  79. pub.publish(channel,message);
  80. console.log("发布消息:channel:"+channel+",message:"+message);
  81. };
  82. if(this.subConnected===false)
  83. {
  84. this.alredyPublishs.push(action);
  85. }
  86. else{
  87. action();
  88. }
  89. }
  90. registerHandlers(channel,action)
  91. {
  92. var actions=this.handlers.get(channel)||new Set();
  93. actions.add(action);
  94. this.handlers.set(channel,actions);
  95. }
  96. subscribe(channel)
  97. {
  98. let self=this;
  99. this.sub.subscribe(channel,function (err,reply) {
  100. if(err){
  101. log.error(err);
  102. }
  103. self.subConnected=true;
  104. for(let publish of self.alredyPublishs){
  105. publish();
  106. }
  107. console.log("订阅成功:"+reply);
  108. });
  109. this.sub.on("message", function (channel, message) {
  110. self.subAction(channel,message);
  111. });
  112. }
  113. tearDown()
  114. {
  115. this.sub.quit();
  116. }
  117. }
  118. // Expose class
  119. module.exports = new PubSub();