pubSub.js 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. /*
  2. * redis发布订阅
  3. *example:
  4. * let channel="ryan";
  5. redis.pubSub.registerHandlers("ryan",msg=> console.log(msg));
  6. redis.pubSub.subscribe(channel);
  7. redis.pubSub.publish(channel,"hello from chen");
  8. */
  9. "use strict";
  10. let configFile = require('../../include/commons').CONFIG_FILE;
  11. let config = require('../../resources/config/' + configFile);
  12. let RedisModel = require('./../redis.model');
  13. let RedisSubClient = require('./redisSubClient');
  14. let RedisPubClient = require('./redisPubClient');
  15. let redisPubConn = RedisPubClient.redisClient().connection;
  16. let redisSubConn = RedisSubClient.redisClient().connection;
  17. let Sessions = require('../sessions/sessions.js');
  18. let WechatClient = require("../client/wechat.client.js");
  19. let WlyySDK = require("../../util/wlyy.sdk");
  20. let AppClient = require("../client/app.client.js");
  21. class PubSub{
  22. constructor(){
  23. this.sub=redisSubConn;
  24. this.handlers=new Map();
  25. this.subAction=(channle,message)=>{
  26. let actions= this.handlers.get(channle)||new Set();
  27. for(let action of actions)
  28. {
  29. console.log("接收消息:"+message);
  30. if(config.pubSubSwitch){//接收订阅消息处理开关,本地运行和测试库单独运行时防止用户接收消息2次
  31. message = JSON.parse(message);
  32. //Sessions.getRedisPushNotification(message);这里不知为什么无法调用这个方法,提示getRedisPushNotification不是方法
  33. if (message.targetType=='patient') {
  34. if(config.environment!='local'){//pc版接收要发给居民的消息不做处理
  35. WechatClient.sendMessage(message.targetUserId, message.targetUserName, message);
  36. }
  37. } else {
  38. if(message.sessionType=="1"){
  39. WechatClient.sendReadDoctorByDoctorId(message.targetUserId, message);
  40. }
  41. //告知医生新消息
  42. WechatClient.sendSocketMessageToDoctor(message.targetUserId,message);
  43. if(config.environment!='local'){//pc版不推送个推
  44. WlyySDK.request(message.targetUserId, '', '', '', '/im/common/message/messages', 'POST', function (err, res) {
  45. let count = 0;
  46. res = JSON.parse(res)
  47. if (res.status == 200) {
  48. let data = res.data;
  49. count = parseInt(JSON.parse(data.imMsgCount).count) + parseInt(data.system.amount) + parseInt(data.healthIndex.amount) + parseInt(data.sign.amount);
  50. }
  51. if(config.environment!='local'){//pc版不推送个推,通过redis的publish
  52. AppClient.sendNotification(message.targetUserId, message,message.sessionType,count);
  53. }
  54. //外网pcim通过socket推送
  55. WechatClient.sendPcImSocket(message.targetUserId,message,message.sessionType);
  56. });
  57. }
  58. }
  59. }
  60. //action(message);
  61. }
  62. }
  63. this.alredyPublishs=[];
  64. this.subConnected=false;
  65. }
  66. publish(channel,message)
  67. {
  68. let action=()=>{
  69. let pub=redisPubConn;
  70. pub.publish(channel,message);
  71. console.log("发布消息:channel:"+channel+",message:"+message);
  72. };
  73. if(this.subConnected===false)
  74. {
  75. this.alredyPublishs.push(action);
  76. }
  77. else{
  78. action();
  79. }
  80. }
  81. registerHandlers(channel,action)
  82. {
  83. var actions=this.handlers.get(channel)||new Set();
  84. actions.add(action);
  85. this.handlers.set(channel,actions);
  86. }
  87. subscribe(channel)
  88. {
  89. let self=this;
  90. this.sub.subscribe(channel,function (err,reply) {
  91. if(err){
  92. log.error(err);
  93. }
  94. self.subConnected=true;
  95. for(let publish of self.alredyPublishs){
  96. publish();
  97. }
  98. console.log("订阅成功:"+reply);
  99. });
  100. this.sub.on("message", function (channel, message) {
  101. self.subAction(channel,message);
  102. });
  103. }
  104. tearDown()
  105. {
  106. this.sub.quit();
  107. }
  108. }
  109. // Expose class
  110. module.exports = new PubSub();